欢迎投稿

今日深度:

HBase MapReduce,

HBase MapReduce,


package nuc.edu.ls;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;



/**
 * 
 * @ClassName: HbaseMR
 * @Description: 输入的表:数据从一个hbase表里面word,一个列簇col,有一个列line,存放的一句话 输出的表:stat
 *               一个列簇ret,有一个列存放count,存放到rowket中
 * @author LENOVO
 * @date 2018年9月5日
 *
 */
public class HbaseMR  {
	/**
	 * map需要继承TableMapper
	 * 有两个泛型,都输输出的,
	 * 输入的已经是被预定义好的
	 * @author root
	 *
	 */
	public static class MapTask extends TableMapper<Text, IntWritable>{
		//输入的key:rowkey value就是一行数据封装好的一个result
		private static IntWritable value_one = new IntWritable(1);
		private static Text key_word = new Text();
		
		@Override
		protected void map(ImmutableBytesWritable key, Result value,
				Mapper<ImmutableBytesWritable, Result, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			//
			byte[] value2 = value.getValue(Bytes.toBytes("col"), Bytes.toBytes("line"));
			String line = new String(value2);
			String[] words = line.split(" ");
			for (String word : words) {
				key_word.set(word);
				context.write(key_word, value_one);
			}
		}
	}
	
	/**
	 * reduceTask 需要继承TableReduce
	 * keyin 
	 * valuein
	 * keyout: rowkey
	 * @author root
	 *
	 */
	public static class ReduceTask extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, ImmutableBytesWritable, Mutation>.Context context)
				throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable intWritable : values) {
				sum++;
			}
			//写出的结果,封装为put
			Put put = new Put(Bytes.toBytes(key.toString()));
			put.addColumn(Bytes.toBytes("ret"), Bytes.toBytes("count"), Bytes.toBytes(sum));
			
			context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())), put);
		}
	}
	/**
	 * 用来提交任务
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
		//conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181,hadoop03:2181");
		Job job = Job.getInstance(conf,"hbasemr");
		
		job.setJarByClass(HbaseMR.class);
		
		//添加输出类型,reduce,map的class
		
		Scan scan = new Scan();
		//可以添加过滤器等信息
		
		TableMapReduceUtil.initTableMapperJob("word", scan, MapTask.class, Text.class, IntWritable.class, job);
		TableMapReduceUtil.initTableReducerJob("stat", ReduceTask.class, job);
		
		//提交任务
		boolean b = job.waitForCompletion(true);
		System.out.println(b?"程序没毛病":"出bug了");
		
	}
	
	

}

 

www.htsjk.Com true http://www.htsjk.com/hbase/39423.html NewsArticle HBase MapReduce, package nuc.edu.ls;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Mutation;import org.apache.hadoop.hbase.cli...
相关文章
    暂无相关文章
评论暂时关闭