欢迎投稿

今日深度:

HBase之MapReduce,

HBase之MapReduce,


需求:使用MapReduce计算HBase列式数据库中某人的通话数量

Step 1:
Mapper:

public class MyMapper extends TableMapper<IntWritable, IntWritable>{

    protected void map(
            ImmutableBytesWritable key,
            Result value,
            Context context)
            throws IOException, InterruptedException {

        Cell cell = value.getColumnLatestCell("cf1".getBytes(), "cdr".getBytes());
        byte[] data =CellUtil.cloneValue(cell);
        Cdr.days_cdr days_cdr =Cdr.days_cdr.parseFrom(data);
        for (Iterator i = days_cdr.getListCdrList().iterator(); i.hasNext();) {
            Cdr.phone_cdr phone_cdr = (Cdr.phone_cdr) i.next();
            if(phone_cdr!=null){
                String time =phone_cdr.getTime();
                Calendar c =Calendar.getInstance();
                try {
                    c.setTime(new SimpleDateFormat("yyyyMMddHHmmss").parse(time));
                    int m =c.get(Calendar.MONTH);
                    context.write(new IntWritable(m), new IntWritable(1));
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

Step 2:
Reducer:

public class MyReducer extends Reducer<IntWritable, IntWritable, Text, IntWritable>{

    protected void reduce(IntWritable arg0, Iterable<IntWritable> arg1,
            Context arg2)
            throws IOException, InterruptedException {
        int sum =0;
        for(IntWritable i:arg1){
            sum=sum+i.get();
        }
        arg2.write(new Text(arg0.get()+"的通话数量为:"), new IntWritable(sum));
    }
}

Step 3:
MainJob:

public class RunJob {

    public static void main(String[] args) {

        Configuration conf =new Configuration();
        conf.set("fs.defaultFS", "hdfs://node12:8020");
        conf.set("hbase.zookeeper.quorum", "node12,node13,node11");
        conf.set("dfs.permissions", "false");
        conf =HBaseConfiguration.create(conf);//hbase 默认的配置加载
//      conf.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");
        try {
            FileSystem fs =FileSystem.get(conf);
            Job job =Job.getInstance(conf,"wc");
//          job.setJobName(name);
            job.setJarByClass(RunJob.class);

            job.setReducerClass(MyReducer.class);
//          job.setCombinerClass(MyReducer.class);

            job.setMapOutputKeyClass(IntWritable.class);
            job.setMapOutputValueClass(IntWritable.class);

            Scan scan = new Scan();
            scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
            scan.setCacheBlocks(false);  // don't set to true for MR jobs

            TableMapReduceUtil.initTableMapperJob("t_phone_cdr".getBytes(),scan,MyMapper.class,null,null,job,false);

            Path output =new Path("/usr/output/wc");
            if(fs.exists(output)){
                fs.delete(output, true);
            }
            FileOutputFormat.setOutputPath(job, output);

            boolean f= job.waitForCompletion(true);
            if(f){
                System.out.println("job 执行成功");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

www.htsjk.Com true http://www.htsjk.com/hbase/41979.html NewsArticle HBase之MapReduce, 需求:使用MapReduce计算HBase列式数据库中某人的通话数量 Step 1: Mapper: public class MyMapper extends TableMapper IntWritable , IntWritable { protected void map( ImmutableBytesWritable key, Result...
相关文章
    暂无相关文章
评论暂时关闭