HADOOP自定义排序,
package com.zhiyou.bd23.totalorder;
import java.io.IOException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TotalOrder2 {
//自定义partitioner保证两个reduce上分配的数据按照自身标号的大小有序
public static class TotalOrder2Partitioner extends Partitioner<IntWritable, Text> implements Configurable{
private Configuration configuration;
@Override
public int getPartition(IntWritable key, Text value, int numPartitions) {
String maxValue = configuration.get("maxValue");
String minValue = configuration.get("minValue");
int median = (Integer.valueOf(maxValue) + Integer.valueOf(minValue))/2;
if(key.get()>median){
return 1;
}else{
return 0;
}
}
public void setConf(Configuration conf) {
this.configuration = conf;
}
public Configuration getConf() {
return configuration;
}
}
//
public static class TotalOrder2Map extends Mapper<LongWritable, Text, IntWritable, Text>{
private String[] infos;
private IntWritable outputKey = new IntWritable();
private Text outputValue = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context)
throws IOException, InterruptedException {
infos = value.toString().split("\\s+");
if(infos.length == 2){
outputKey.set(Integer.valueOf(infos[1]));
outputValue.set(infos[0]);
context.write(outputKey, outputValue);
}
}
}
public static class TotalOrder2Reduce extends Reducer<IntWritable, Text, Text, IntWritable>{
@Override
protected void reduce(IntWritable key, Iterable<Text> values,
Reducer<IntWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
for(Text value:values){
context.write(value, key);
}
}
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
configuration.set("maxValue", args[0]);
configuration.set("minValue", args[1]);
Job job = Job.getInstance(configuration);
job.setJarByClass(TotalOrder2.class);
job.setJobName("自实现全排序");
job.setMapperClass(TotalOrder2Map.class);
job.setReducerClass(TotalOrder2Reduce.class);
job.setPartitionerClass(TotalOrder2Partitioner.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
Path inputPath = new Path(args[2]);
Path outputDir = new Path(args[3]);
outputDir.getFileSystem(configuration).delete(outputDir, true);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputDir);
//2个reduce节点
job.setNumReduceTasks(2);
System.exit(job.waitForCompletion(true)?0:1);
}
}
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。