欢迎投稿

今日深度:

HADOOP自定义排序,

HADOOP自定义排序,


package com.zhiyou.bd23.totalorder;

import java.io.IOException;

import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TotalOrder2 {
	//自定义partitioner保证两个reduce上分配的数据按照自身标号的大小有序
	public static class TotalOrder2Partitioner extends Partitioner<IntWritable, Text> implements Configurable{
		private Configuration configuration;
		@Override
		public int getPartition(IntWritable key, Text value, int numPartitions) {
			String maxValue = configuration.get("maxValue");
			String minValue = configuration.get("minValue");
			int median = (Integer.valueOf(maxValue) + Integer.valueOf(minValue))/2;
			if(key.get()>median){
				return 1;
			}else{
				return 0;
			}
		}
		public void setConf(Configuration conf) {
			this.configuration = conf;
		}
		public Configuration getConf() {
			return configuration;
		}
	}
	//
	public static class TotalOrder2Map extends Mapper<LongWritable, Text, IntWritable, Text>{
		private String[] infos;
		private IntWritable outputKey = new IntWritable();
		private Text outputValue = new Text();
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context)
				throws IOException, InterruptedException {
			infos = value.toString().split("\\s+");
			if(infos.length == 2){
				outputKey.set(Integer.valueOf(infos[1]));
				outputValue.set(infos[0]);
				context.write(outputKey, outputValue);
			}
		}
	}
	public static class TotalOrder2Reduce extends Reducer<IntWritable, Text, Text, IntWritable>{
		@Override
		protected void reduce(IntWritable key, Iterable<Text> values,
				Reducer<IntWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
			for(Text value:values){
				context.write(value, key); 
			}
		}
	}
	
	public static void main(String[] args) throws Exception {
		Configuration configuration = new Configuration();
		configuration.set("maxValue", args[0]);
		configuration.set("minValue", args[1]);
		Job job = Job.getInstance(configuration);
		job.setJarByClass(TotalOrder2.class);
		job.setJobName("自实现全排序");
		job.setMapperClass(TotalOrder2Map.class);
		job.setReducerClass(TotalOrder2Reduce.class);
		job.setPartitionerClass(TotalOrder2Partitioner.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Text.class);
		Path inputPath = new Path(args[2]);
		Path outputDir = new Path(args[3]);
		outputDir.getFileSystem(configuration).delete(outputDir, true);
		FileInputFormat.addInputPath(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputDir);
		
		//2个reduce节点
		job.setNumReduceTasks(2);
		
		System.exit(job.waitForCompletion(true)?0:1);
	}
}


www.htsjk.Com true http://www.htsjk.com/Hadoop/41567.html NewsArticle HADOOP自定义排序, package com.zhiyou.bd23.totalorder;import java.io.IOException;import org.apache.hadoop.conf.Configurable;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWr...
相关文章
    暂无相关文章
评论暂时关闭