欢迎投稿

今日深度:

hadoop计算平均值,hadoop平均值

hadoop计算平均值,hadoop平均值


        combiner是运行在本地的,reduce是收集全部的,比如一个文件很大1G,比如一个文件很大1G,如果你的集群是5台双核的,如果你的集群是5台双核的,这样这16个块会被分到这10个块里面,相当于要2轮,假设是1、2分给1号机,3、4分给2号机,这样1、2求和完了之后会在1号机上运行一次combiner,3、4完了再2号机上运行一次combiner,所有的combiner运行完了,所有的数据会汇集到reduce上做最终处理。


输入(数据摘自互联网):

data1:



data2:



程序源代码:

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class CountAverage {
	public static class AverageMapper extends Mapper<Object, Text, Text, Text>{
		 public void map(Object key, Text value, Context context) 
		    throws IOException, InterruptedException{
			 String inputline=value.toString();
			 StringTokenizer itr = new StringTokenizer(inputline);
			 String mapkey="";
			 String mapvalue="";
			 int count=0;
			 while (itr.hasMoreTokens()) {
				 if(count>0){
					 mapvalue=itr.nextToken();
					 continue;
				 }
				 mapkey=itr.nextToken();
				 count++;
			 }
			 context.write(new Text(mapkey),new Text(mapvalue));
		 }
	}
	
	public static class AverageCombiner extends Reducer<Text,Text,Text,Text> {
		public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
			Double sum=0.00;
			int count=0;
			for(Text t:values){
				sum=sum+Double.parseDouble(t.toString());
				count++;
			}
			context.write(new Text(key),new Text(sum+"-"+count));
		}
	}
	
	public static class AverageReducer extends Reducer<Text,Text,Text,DoubleWritable> {
		public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {
			Double sum=0.00;
			int count=0;
			for(Text t:values){
				String[] str=t.toString().split("-");
				sum+=Double.parseDouble(str[0]);
				count+=Integer.parseInt(str[1]);
			}
			context.write(new Text(key),new DoubleWritable(sum/count));
		}
	}
	
	/**
	 * @param args
	 * @throws IOException 
	 * @throws ClassNotFoundException 
	 * @throws InterruptedException 
	 */
	public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
		// TODO Auto-generated method stub
		Configuration conf = new Configuration();
		Job job = new Job(conf, "count average");
		job.setJarByClass(CountAverage.class);
		job.setMapperClass(AverageMapper.class);
		job.setCombinerClass(AverageCombiner.class);
		job.setReducerClass(AverageReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/user/pu/input/*"));
		FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/user/pu/output/*"));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}


运行结果:





www.htsjk.Com true http://www.htsjk.com/Hadoop/32869.html NewsArticle hadoop计算平均值,hadoop平均值         combiner是运行在本地的,reduce是收集全部的,比如一个文件很大1G,比如一个文件很大1G,如果你的集群是5台双核的,如果你的集群是5台双核的,...
相关文章
    暂无相关文章
评论暂时关闭