Hadooop的Java版本WordCount,
待处理的文本为:
hello,word,spark,hbase,hello hive,sqoop,impala,hdfs flume,kafka,flink,cloudera,spark,sqoop hello,word,spark,hbase,hello hive,sqoop,impala,hdfs flume,kafka,flink,cloudera,spark,sqoop
需求对上面的文本进行统计每个单词出现的次数:
首先定义我们的mapper
package com.mystudy.mapreduce; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WordAccountMapper1 extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 按照逗号进行切分 String[] split = value.toString().split(","); for (String word : split) { //每个单词计数为1 context.write(new Text(word), new LongWritable(1)); } } }
然后定义我们的reduce
package com.mystudy.mapreduce; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; // context 为我们的上下文对象 public class WordAccountReduce1 extends Reducer<Text, LongWritable, Text, LongWritable> { @Override //key 是对应的我们的单词,values的类型是一个迭代器(1,1,1,1,1,1)这种类型的数据 protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; for (LongWritable value : values) { count += value.get(); } // context.write(new Text(key), new LongWritable(count)); } }
最后定义我们的主函数
package com.mystudy.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WordAccountMain1 extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(super.getConf(), WordAccountMain1.class.getSimpleName()); //打包到集群当中时候必须指定程序的主函数 job.setJarByClass(WordAccountMain1.class); //第一步:读取输入文件解析成key value job.setInputFormatClass(TextInputFormat.class); TextInputFormat.addInputPath(job, new Path("hdfs://IPaddress:8020/wordcocunt")); //第二步设置我们的mapper类 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //第三、四、五、六 步为分区排序规约分组,这里不设置即是默认值 //第七步设置reduce类 job.setReducerClass(WordAccountReduce1.class); //设置reduce 之后的阶段的输出类型 job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //第八步 设置输出类以及输出路径 job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job, new Path("hdfs://IPaddress:8020/wordcount/out")); boolean b = job.waitForCompletion(true); return b ? 0 : 1; } public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); Tool jobMain = new WordAccountMain1(); int run = ToolRunner.run(configuration, jobMain, args); System.exit(run); } }
这样既可得到我们的输出的统计每个单词出现的次数。
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。