欢迎投稿

今日深度:

hadoop常用算法简单实例,hadoop实例

hadoop常用算法简单实例,hadoop实例


实例一、对以下数据进行排序,根据收入减去支出得到最后结余从大到小排序,数据如下:


SumStep运行之后结果如下:


SortStep运行之后结果为上图根据结余从大到小排序。

代码如下:

public class InfoBean implements WritableComparable<InfoBean>{

	private String account;
	
	private double income;
	
	private double expenses;
	
	private double surplus;
	
	public void set(String account, double income, double expenses){
		this.account = account;
		this.income = income;
		this.expenses = expenses;
		this.surplus = income - expenses;
	}
	
	@Override
	public String toString() {
		return this.income + "\t" + this.expenses + "\t" + this.surplus;
	}

	/**
	 * serialize
	 */
	public void write(DataOutput out) throws IOException {
		out.writeUTF(account);
		out.writeDouble(income);
		out.writeDouble(expenses);
		out.writeDouble(surplus);
	}

	/**
	 * deserialize
	 */
	public void readFields(DataInput in) throws IOException {
		this.account = in.readUTF();
		this.income = in.readDouble();
		this.expenses = in.readDouble();
		this.surplus = in.readDouble();
	}
	

	public int compareTo(InfoBean o) {
		if(this.income == o.getIncome()){
			return this.expenses > o.getExpenses() ? 1 : -1; 
		} else {
			return this.income > o.getIncome() ? -1 : 1;
		}
	}

	public String getAccount() {
		return account;
	}

	public void setAccount(String account) {
		this.account = account;
	}

	public double getIncome() {
		return income;
	}

	public void setIncome(double income) {
		this.income = income;
	}

	public double getExpenses() {
		return expenses;
	}

	public void setExpenses(double expenses) {
		this.expenses = expenses;
	}

	public double getSurplus() {
		return surplus;
	}

	public void setSurplus(double surplus) {
		this.surplus = surplus;
	}

	
}
public class SumStep {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(SumStep.class);
		
		job.setMapperClass(SumMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(InfoBean.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
		job.setReducerClass(SumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(InfoBean.class);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);
	}

	public static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean>{

		private InfoBean bean = new InfoBean();
		private Text k = new Text();
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			// split 
			String line = value.toString();
			String[] fields = line.split("\t");
			// get useful field
			String account = fields[0];
			double income = Double.parseDouble(fields[1]);
			double expenses = Double.parseDouble(fields[2]);
			k.set(account);
			bean.set(account, income, expenses);
			context.write(k, bean);
		}
	}
	
	public static class SumReducer extends Reducer<Text, InfoBean, Text, InfoBean>{

		private InfoBean bean = new InfoBean();
		@Override
		protected void reduce(Text key, Iterable<InfoBean> v2s, Context context)
				throws IOException, InterruptedException {
			
			double in_sum = 0;
			double out_sum = 0;
			for(InfoBean bean : v2s){
				in_sum += bean.getIncome();
				out_sum += bean.getExpenses();
			}
			bean.set("", in_sum, out_sum);
			context.write(key, bean);
		}
		
	}
}

此处的输入为SumStep的输出而不是源文件作为输入,当然也可以将两个job合并到一起执行,此处不再讨论。
public class SortStep {

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(SortStep.class);
		
		job.setMapperClass(SortMapper.class);
		job.setMapOutputKeyClass(InfoBean.class);
		job.setMapOutputValueClass(NullWritable.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
		job.setReducerClass(SortReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(InfoBean.class);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);

	}

	public static class SortMapper extends Mapper<LongWritable, Text, InfoBean, NullWritable>{

		private InfoBean bean = new InfoBean();
		
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] fields = line.split("\t");
			String account = fields[0];
			double income = Double.parseDouble(fields[1]);
			double expenses = Double.parseDouble(fields[2]);
			bean.set(account, income, expenses);
			context.write(bean, NullWritable.get());
		}
		
	}
	
	
	public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{

		private Text k = new Text();
		@Override
		protected void reduce(InfoBean bean, Iterable<NullWritable> v2s, Context context)
				throws IOException, InterruptedException {
			String account = bean.getAccount();
			k.set(account);
			context.write(k, bean);
		}
		
	}
}

实例二、倒排索引,过程如下
Map阶段
<0,"hello tom">
....


context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);
context.write("hello->a.txt",1);

context.write("hello->b.txt",1);
context.write("hello->b.txt",1);
context.write("hello->b.txt",1);
--------------------------------------------------------
combiner阶段
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>
<"hello->a.txt",1>

<"hello->b.txt",1>
<"hello->b.txt",1>
<"hello->b.txt",1>

context.write("hello","a.txt->5");
context.write("hello","b.txt->3");
--------------------------------------------------------
Reducer阶段
<"hello",{"a.txt->5","b.txt->3"}>


context.write("hello","a.txt->5 b.txt->3");
-------------------------------------------------------
hello	"a.txt->5 b.txt->3"
tom		"a.txt->2 b.txt->1"
kitty	"a.txt->1"
.......
代码如下:

public class InverseIndex {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf);
		//设置jar
		job.setJarByClass(InverseIndex.class);
		
		//设置Mapper相关的属性
		job.setMapperClass(IndexMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));//words.txt
		
		//设置Reducer相关属性
		job.setReducerClass(IndexReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.setCombinerClass(IndexCombiner.class);
				
		//提交任务
		job.waitForCompletion(true);
	}
	public static class IndexMapper extends Mapper<LongWritable, Text, Text, Text>{

		private Text k = new Text();
		private Text v = new Text();
		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] fields = line.split(" ");
			FileSplit inputSplit = (FileSplit) context.getInputSplit();
			Path path = inputSplit.getPath();
			String name = path.getName();
			for(String f : fields){
				k.set(f + "->" + name);
				v.set("1");
				context.write(k, v);
			}
		}
		
	}
	public static class IndexCombiner extends Reducer<Text, Text, Text, Text>{

		private Text k = new Text();
		private Text v = new Text();
		@Override
		protected void reduce(Text key, Iterable<Text> values,
				Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String[] fields = key.toString().split("->");
			long sum = 0;
			for(Text t : values){
				sum += Long.parseLong(t.toString());
			}
			k.set(fields[0]);
			v.set(fields[1] + "->" + sum);
			context.write(k, v);
		}
		
	}
	public static class IndexReducer extends Reducer<Text, Text, Text, Text>{

		private Text v = new Text();
		@Override
		protected void reduce(Text key, Iterable<Text> values,
				Reducer<Text, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String value = "";
			for(Text t : values){
				value += t.toString() + " ";
			}
			v.set(value);
			context.write(key, v);
		}
		
	}

}

实例三、使用Partitioner使相同或者相似的数据传递到相同的reduce:

数据格式如下,分别代表手机号,上行流量,下行流量:


代码如下:

public class DataBean implements Writable {

	private String telNo;
	
	private long upPayLoad;
	
	private long downPayLoad;
	
	private long totalPayLoad;
	
	public DataBean(){}
	
	public DataBean(String telNo, long upPayLoad, long downPayLoad) {
		super();
		this.telNo = telNo;
		this.upPayLoad = upPayLoad;
		this.downPayLoad = downPayLoad;
		this.totalPayLoad = upPayLoad + downPayLoad;
	}
	
	@Override
	public String toString() {
		return this.upPayLoad + "\t" + this.downPayLoad + "\t" + this.totalPayLoad;
	}

	/**
	 * 序列化
	 * 注意:1.类型 2.顺序
	 */
	public void write(DataOutput out) throws IOException {
		out.writeUTF(telNo);
		out.writeLong(upPayLoad);
		out.writeLong(downPayLoad);
		out.writeLong(totalPayLoad);
	}

	/**
	 * 反序列化
	 */
	public void readFields(DataInput in) throws IOException {
		this.telNo = in.readUTF();
		this.upPayLoad = in.readLong();
		this.downPayLoad = in.readLong();
		this.totalPayLoad = in.readLong();
	}

	public String getTelNo() {
		return telNo;
	}

	public void setTelNo(String telNo) {
		this.telNo = telNo;
	}

	public long getUpPayLoad() {
		return upPayLoad;
	}

	public void setUpPayLoad(long upPayLoad) {
		this.upPayLoad = upPayLoad;
	}

	public long getDownPayLoad() {
		return downPayLoad;
	}

	public void setDownPayLoad(long downPayLoad) {
		this.downPayLoad = downPayLoad;
	}

	public long getTotalPayLoad() {
		return totalPayLoad;
	}

	public void setTotalPayLoad(long totalPayLoad) {
		this.totalPayLoad = totalPayLoad;
	}
	
}

public class DataCount {

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(DataCount.class);
		
		job.setMapperClass(DCMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(DataBean.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
		job.setReducerClass(DCReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(DataBean.class);
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		//设置reduce默认的Partitioner
		job.setPartitionerClass(ServiceProviderPartitioner.class);
		//此处需要设置reduce的数量
		job.setNumReduceTasks(Integer.parseInt(args[2]));
		
		job.waitForCompletion(true);
	}
	
	
	public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{

		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			//接收数据
			String line = value.toString();
			//分割数据
			String[] fields = line.split("\t");
			//获取有效字段,封装到对象里面
			//手机号
			String telNo = fields[0];
			//上行流量
			long up = Long.parseLong(fields[1]);
			//下行流量
			long down = Long.parseLong(fields[2]);
			
			//封装数据,new DataBean
			DataBean bean = new DataBean(telNo, up, down);
			
			//输出
			context.write(new Text(telNo), bean);
		}
	}
	
	
	public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean>{

		@Override
		protected void reduce(Text key, Iterable<DataBean> v2s, Context context)
				throws IOException, InterruptedException {
			//定义计数器
			long up_sum = 0;
			long down_sum = 0;
			
			//迭代v2s,进行求和
			for(DataBean bean : v2s){
				up_sum += bean.getUpPayLoad();
				down_sum += bean.getDownPayLoad();
			}
			
			//封装数据
			DataBean bean = new DataBean("", up_sum, down_sum);
			
			//输出
			context.write(key, bean);
		}
		
	}

	public static class ServiceProviderPartitioner extends Partitioner<Text, DataBean>{
	
		private static Map<String, Integer> providerMap = new HashMap<String, Integer>();
 		
		static {
			providerMap.put("139", 1);
			providerMap.put("138", 2);
			providerMap.put("159", 3);
		}
		
		@Override
		public int getPartition(Text key, DataBean value, int number) {
			String telNo = key.toString();
			String pcode = telNo.substring(0, 3);
			Integer p = providerMap.get(pcode);
			if(p == null){
				p = 0;
			}
			return p;
		}
		
	}
}

实例四、实现以下简单算法,其中mr程序涉及到了reduce分组等概念:

#当第一列相同时,求出第二列的最小值
3	3
3	2
3	1
2	2
2	1
1	1
----------结果---------
3	1
2	1
1	1

public class GroupApp {
	static final String INPUT_PATH = "hdfs://xxx:9000/input";
	static final String OUT_PATH = "hdfs://xxx:9000/out";
	public static void main(String[] args) throws Exception{
		final Configuration configuration = new Configuration();
		
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), configuration);
		if(fileSystem.exists(new Path(OUT_PATH))){
			fileSystem.delete(new Path(OUT_PATH), true);
		}
		
		final Job job = Job.getInstance(configuration, GroupApp.class.getSimpleName());
		
		//1.1 指定输入文件路径
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		//指定哪个类用来格式化输入文件
		job.setInputFormatClass(TextInputFormat.class);
		
		//1.2指定自定义的Mapper类
		job.setMapperClass(MyMapper.class);
		//指定输出<k2,v2>的类型
		job.setMapOutputKeyClass(NewK2.class);
		job.setMapOutputValueClass(LongWritable.class);
		
		//1.3 指定分区类
		job.setPartitionerClass(HashPartitioner.class);
		job.setNumReduceTasks(1);
		
		//1.4 TODO 排序、分区
		job.setGroupingComparatorClass(MyGroupingComparator.class);
		//1.5  TODO (可选)合并
		
		//2.2 指定自定义的reduce类
		job.setReducerClass(MyReducer.class);
		//指定输出<k3,v3>的类型
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(LongWritable.class);
		
		//2.3 指定输出到哪里
		FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
		//设定输出文件的格式化类
		job.setOutputFormatClass(TextOutputFormat.class);
		
		//把代码提交给JobTracker执行
		job.waitForCompletion(true);
	}

	
	static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{
		protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,NewK2,LongWritable>. throws java.io.IOException ,InterruptedException {
			final String[] splited = value.toString().split("\t");
			final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1]));
			final LongWritable v2 = new LongWritable(Long.parseLong(splited[1]));
			context.write(k2, v2);
		};
	}
	
	static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{
		protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2,LongWritable,LongWritable,LongWritable>.Context context) throws java.io.IOException ,InterruptedException {
			long min = Long.MAX_VALUE;
			for (LongWritable v2 : v2s) {
				if(v2.get()<min){
					min = v2.get();
				}
			}
			
			context.write(new LongWritable(k2.first), new LongWritable(min));
		};
	}
	
	/**
	 * 问:为什么实现该类?
	 * 答:因为原来的v2不能参与排序,把原来的k2和v2封装到一个类中,作为新的k2
	 *
	 */
	static class  NewK2 implements WritableComparable<NewK2>{
		Long first;
		Long second;
		
		public NewK2(){}
		
		public NewK2(long first, long second){
			this.first = first;
			this.second = second;
		}
		
		
		public void readFields(DataInput in) throws IOException {
			this.first = in.readLong();
			this.second = in.readLong();
		}

		public void write(DataOutput out) throws IOException {
			out.writeLong(first);
			out.writeLong(second);
		}

		/**
		 * 当k2进行排序时,会调用该方法.
		 * 当第一列不同时,升序;当第一列相同时,第二列升序
		 */
		public int compareTo(NewK2 o) {
			final long minus = this.first - o.first;
			if(minus !=0){
				return (int)minus;
			}
			return (int)(this.second - o.second);
		}
		
		@Override
		public int hashCode() {
			return this.first.hashCode()+this.second.hashCode();
		}
		
		@Override
		public boolean equals(Object obj) {
			if(!(obj instanceof NewK2)){
				return false;
			}
			NewK2 oK2 = (NewK2)obj;
			return (this.first==oK2.first)&&(this.second==oK2.second);
		}
	}
	
	/**
	 * 问:为什么自定义该类?
	 * 答:业务要求分组是按照第一列分组,但是NewK2的比较规则决定了不能按照第一列分。只能自定义分组比较器。
	 */
	static class MyGroupingComparator implements RawComparator<NewK2>{

		public int compare(NewK2 o1, NewK2 o2) {
			return (int)(o1.first - o2.first);
		}
		/**
		 * @param arg0 表示第一个参与比较的字节数组
		 * @param arg1 表示第一个参与比较的字节数组的起始位置
		 * @param arg2 表示第一个参与比较的字节数组的偏移量
		 * 
		 * @param arg3 表示第二个参与比较的字节数组
		 * @param arg4 表示第二个参与比较的字节数组的起始位置
		 * @param arg5 表示第二个参与比较的字节数组的偏移量
		 */
		public int compare(byte[] arg0, int arg1, int arg2, byte[] arg3,
				int arg4, int arg5) {
			return WritableComparator.compareBytes(arg0, arg1, 8, arg3, arg4, 8);
		}
		
	}
}

实例五:利用MapReduce求解海量数据文件中的最大值,利用Mapper类中的cleanup()函数,因为cleanup()函数是在所有的map()完成之后才执行的。


public class TopKApp {
	static final String INPUT_PATH = "hdfs://xxx:9000/input";
	static final String OUT_PATH = "hdfs://xxx:9000/out";
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUT_PATH);
		if(fileSystem.exists(outPath)){
			fileSystem.delete(outPath, true);
		}
		
		final Job job = Job.getInstance(conf , WordCountApp.class.getSimpleName());
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(NullWritable.class);
		FileOutputFormat.setOutputPath(job, outPath);
		job.waitForCompletion(true);
	}
	static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable>{
		long max = Long.MIN_VALUE;
		protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
			final long temp = Long.parseLong(v1.toString());
			if(temp>max){
				max = temp;
			}
		};
		
		protected void cleanup(org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,LongWritable, NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
			context.write(new LongWritable(max), NullWritable.get());
		};
	}
	
	static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable>{
		long max = Long.MIN_VALUE;
		protected void reduce(LongWritable k2, java.lang.Iterable<NullWritable> arg1, org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context arg2) throws java.io.IOException ,InterruptedException {
			final long temp = k2.get();
			if(temp>max){
				max = temp;
			}
		};
		
		protected void cleanup(org.apache.hadoop.mapreduce.Reducer<LongWritable,NullWritable,LongWritable,NullWritable>.Context context) throws java.io.IOException ,InterruptedException {
			context.write(new LongWritable(max), NullWritable.get());
		};
	}		
}

实力六、计数器:

public class WordCountApp {
	static final String INPUT_PATH = "hdfs://xxx:9000/hello";
	static final String OUT_PATH = "hdfs://xxx:9000/out";
	
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);
		final Path outPath = new Path(OUT_PATH);
		if(fileSystem.exists(outPath)){
			fileSystem.delete(outPath, true);
		}
		
		final Job job =  Job.getInstance(conf , WordCountApp.class.getSimpleName());
		//1.1指定读取的文件位于哪里
		FileInputFormat.setInputPaths(job, INPUT_PATH);
		//指定如何对输入文件进行格式化,把输入文件每一行解析成键值对
		//job.setInputFormatClass(TextInputFormat.class);
		
		//1.2 指定自定义的map类
		job.setMapperClass(MyMapper.class);
		//map输出的<k,v>类型。如果<k3,v3>的类型与<k2,v2>类型一致,则可以省略
		//job.setMapOutputKeyClass(Text.class);
		//job.setMapOutputValueClass(LongWritable.class);
		
		//1.3 分区
		//job.setPartitionerClass(HashPartitioner.class);
		//有一个reduce任务运行
		//job.setNumReduceTasks(1);
		
		//1.4 TODO 排序、分组
		
		//1.5 TODO 规约
		
		//2.2 指定自定义reduce类
		job.setReducerClass(MyReducer.class);
		//指定reduce的输出类型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		
		//2.3 指定写出到哪里
		FileOutputFormat.setOutputPath(job, outPath);
		//指定输出文件的格式化类
		//job.setOutputFormatClass(TextOutputFormat.class);
		
		//把job提交给JobTracker运行
		job.waitForCompletion(true);
	}
	
	/**
	 * KEYIN	即k1		表示行的偏移量
	 * VALUEIN	即v1		表示行文本内容
	 * KEYOUT	即k2		表示行中出现的单词
	 * VALUEOUT	即v2		表示行中出现的单词的次数,固定值1
	 */
	static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable>{
		protected void map(LongWritable k1, Text v1, Context context) throws java.io.IOException ,InterruptedException {
			final Counter helloCounter = context.getCounter("Sensitive Words", "hello");
			
			final String line = v1.toString();
			if(line.contains("hello")){
				//记录敏感词出现在一行中
				helloCounter.increment(1L);
			}
			final String[] splited = line.split("\t");
			for (String word : splited) {
				context.write(new Text(word), new LongWritable(1));
			}
		};
	}
	
	/**
	 * KEYIN	即k2		表示行中出现的单词
	 * VALUEIN	即v2		表示行中出现的单词的次数
	 * KEYOUT	即k3		表示文本中出现的不同单词
	 * VALUEOUT	即v3		表示文本中出现的不同单词的总次数
	 *
	 */
	static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable>{
		protected void reduce(Text k2, java.lang.Iterable<LongWritable> v2s, Context ctx) throws java.io.IOException ,InterruptedException {
			long times = 0L;
			for (LongWritable count : v2s) {
				times += count.get();
			}
			ctx.write(k2, new LongWritable(times));
		};
	}
		
}




www.htsjk.Com true http://www.htsjk.com/Hadoop/26743.html NewsArticle hadoop常用算法简单实例,hadoop实例 实例一、对以下数据进行排序,根据收入减去支出得到最后结余从大到小排序,数据如下: SumStep运行之后结果如下: SortStep运行之后结果为上图根据...
相关文章
    暂无相关文章
评论暂时关闭