HBase之MapReduce,
需求:使用MapReduce计算HBase列式数据库中某人的通话数量
Step 1:
Mapper:
public class MyMapper extends TableMapper<IntWritable, IntWritable>{
protected void map(
ImmutableBytesWritable key,
Result value,
Context context)
throws IOException, InterruptedException {
Cell cell = value.getColumnLatestCell("cf1".getBytes(), "cdr".getBytes());
byte[] data =CellUtil.cloneValue(cell);
Cdr.days_cdr days_cdr =Cdr.days_cdr.parseFrom(data);
for (Iterator i = days_cdr.getListCdrList().iterator(); i.hasNext();) {
Cdr.phone_cdr phone_cdr = (Cdr.phone_cdr) i.next();
if(phone_cdr!=null){
String time =phone_cdr.getTime();
Calendar c =Calendar.getInstance();
try {
c.setTime(new SimpleDateFormat("yyyyMMddHHmmss").parse(time));
int m =c.get(Calendar.MONTH);
context.write(new IntWritable(m), new IntWritable(1));
} catch (ParseException e) {
e.printStackTrace();
}
}
}
}
}
Step 2:
Reducer:
public class MyReducer extends Reducer<IntWritable, IntWritable, Text, IntWritable>{
protected void reduce(IntWritable arg0, Iterable<IntWritable> arg1,
Context arg2)
throws IOException, InterruptedException {
int sum =0;
for(IntWritable i:arg1){
sum=sum+i.get();
}
arg2.write(new Text(arg0.get()+"的通话数量为:"), new IntWritable(sum));
}
}
Step 3:
MainJob:
public class RunJob {
public static void main(String[] args) {
Configuration conf =new Configuration();
conf.set("fs.defaultFS", "hdfs://node12:8020");
conf.set("hbase.zookeeper.quorum", "node12,node13,node11");
conf.set("dfs.permissions", "false");
conf =HBaseConfiguration.create(conf);//hbase 默认的配置加载
// conf.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");
try {
FileSystem fs =FileSystem.get(conf);
Job job =Job.getInstance(conf,"wc");
// job.setJobName(name);
job.setJarByClass(RunJob.class);
job.setReducerClass(MyReducer.class);
// job.setCombinerClass(MyReducer.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
Scan scan = new Scan();
scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false); // don't set to true for MR jobs
TableMapReduceUtil.initTableMapperJob("t_phone_cdr".getBytes(),scan,MyMapper.class,null,null,job,false);
Path output =new Path("/usr/output/wc");
if(fs.exists(output)){
fs.delete(output, true);
}
FileOutputFormat.setOutputPath(job, output);
boolean f= job.waitForCompletion(true);
if(f){
System.out.println("job 执行成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。