欢迎投稿

今日深度:

Hadooop的Java版本WordCount,

Hadooop的Java版本WordCount,


待处理的文本为:

hello,word,spark,hbase,hello
hive,sqoop,impala,hdfs
flume,kafka,flink,cloudera,spark,sqoop
hello,word,spark,hbase,hello
hive,sqoop,impala,hdfs
flume,kafka,flink,cloudera,spark,sqoop

需求对上面的文本进行统计每个单词出现的次数:
首先定义我们的mapper

package com.mystudy.mapreduce;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordAccountMapper1 extends Mapper<LongWritable, Text, Text, LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    	// 按照逗号进行切分
        String[] split = value.toString().split(",");
        for (String word : split) {
        	//每个单词计数为1
            context.write(new Text(word), new LongWritable(1));
        }
    }
}

然后定义我们的reduce

package com.mystudy.mapreduce;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
// context 为我们的上下文对象
public class WordAccountReduce1 extends Reducer<Text, LongWritable, Text, LongWritable> {
    @Override
    //key 是对应的我们的单词,values的类型是一个迭代器(1,1,1,1,1,1)这种类型的数据
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long count = 0;
        for (LongWritable value : values) {
            count += value.get();
        }
        // 
        context.write(new Text(key), new LongWritable(count));
    }
}

最后定义我们的主函数

package com.mystudy.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordAccountMain1 extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf(), WordAccountMain1.class.getSimpleName());
        //打包到集群当中时候必须指定程序的主函数
        job.setJarByClass(WordAccountMain1.class);
        //第一步:读取输入文件解析成key value
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path("hdfs://IPaddress:8020/wordcocunt"));

        //第二步设置我们的mapper类
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //第三、四、五、六 步为分区排序规约分组,这里不设置即是默认值

        //第七步设置reduce类
        job.setReducerClass(WordAccountReduce1.class);
        //设置reduce 之后的阶段的输出类型
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //第八步 设置输出类以及输出路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path("hdfs://IPaddress:8020/wordcount/out"));
        boolean b = job.waitForCompletion(true);
        return b ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        Tool jobMain = new WordAccountMain1();
        int run = ToolRunner.run(configuration, jobMain, args);
        System.exit(run);
    }
}

这样既可得到我们的输出的统计每个单词出现的次数。

www.htsjk.Com true http://www.htsjk.com/Hadoop/45927.html NewsArticle Hadooop的Java版本WordCount, 待处理的文本为 hello , word , spark , hbase , hellohive , sqoop , impala , hdfsflume , kafka , flink , cloudera , spark , sqoophello , word , spark , hbase , hellohive , sqoop , impala , hdfsflume , k...
评论暂时关闭