HBase——MapReduce操作HBase,
本节目标:
掌握编写MR操作HBase。
通过官方实例,可以看到自定义map函数要继承TableMapper,自定义reduce函数要继承TableReducer。
测试实例:
/**
* 从文本中读取内容,结果保存到HBase中
*/
/**
* map函数如果继承TableRMap,则从HBase中读取内容,在此依旧从HDFS中读取
*/
class WCMap extends Mapper<LongWritable, Text, Text, IntWritable>{
/**
* key: 偏移量
* value:每行文本内容
* context:输出内容上下文
*/
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
IntWritable num = new IntWritable(1);
String str = value.toString();
String[] words = str.split(" ");
for (String word : words) {
context.write(new Text(word), num);
}
}
}
/**
* 继承TableReduce,结果保存到HBase表中
* Text:输入的键类型
* IntWritable:输入的值类型
* ImmutableBytesWritable:输出的键类型
*/
class WCReduce2 extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{
@Override
protected void reduce(Text rowkey,Iterable<IntWritable> value,Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable i : value) {
sum += i.get();
}
Put put = new Put(rowkey.toString().getBytes());
put.add("cf".getBytes(), "count".getBytes(), Bytes.toBytes(sum));
context.write(null, put);
}
}
public class WCJob {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//默认加载src目录下的配置文件(本地测试时,src下不能添加MR的配置文件)
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);//实例化job
job.setJarByClass(WCJob.class);
job.setMapperClass(WCMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
Path inPath = new Path("/wc/input/wc.txt");//设置输入文件
FileInputFormat.addInputPath(job, inPath);
//设置reduce
String targetTable = "wc";
TableMapReduceUtil.initTableReducerJob(targetTable, WCReduce2.class, job);
boolean flag = job.waitForCompletion(true);
if (flag) {
System.out.println("Job Success!");
}
}
}
本地测试会出错!
Exception in thread "main" java.lang.IllegalArgumentException: Pathname /D:/source_maven/myhbase/lib/netty-all-4.0.50.Final.jar from
hdfs://master:8020/D:/source_maven/myhbase/lib/netty-all-4.0.50.Final.jar is not a valid DFS filename.
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:187)at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:101)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1068)
at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1064)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1064)
at org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager.getFileStatus
使用服务器环境测试:
1)src下添加4个配置文件
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。