欢迎投稿

今日深度:

使用MapReduce往Hbase插入数据,

使用MapReduce往Hbase插入数据,


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import com.hbase.log.RecordParser;

public class HbaseInsertData {
	
	public static class HbaseMapper 
		extends Mapper<LongWritable, Text, Text, Text>{
		
		RecordParser parser = new RecordParser();
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			parser.parse(value);
			String phone = parser.getPhone();
			int bloodPressure = parser.getBloodPressure();
			if(bloodPressure > 150) {
				context.write(new Text(phone), new Text(bloodPressure + ""));
			}
		}
	}
	
	public static class HbaseReducer
		extends TableReducer<Text, Text, ImmutableBytesWritable> {

		@Override
		protected void reduce(Text key, Iterable<Text> values,
				Context context)
				throws IOException, InterruptedException {
			String value = values.iterator().next().toString();
			Put putRow = new Put(key.getBytes());
			putRow.add("f1".getBytes(), "qualifier".getBytes(), value.getBytes());
			
			context.write(new ImmutableBytesWritable(key.getBytes()), putRow);
		}
	}
	
	public static void main(String[] args) throws Exception{
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum.", "localhost");  //千万别忘记配置

		Job job = new Job(conf, "count");
		
		job.setJarByClass(HbaseInsertData.class);
		job.setMapperClass(HbaseMapper.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		
		Path in = new Path("hdfs://localhost:9000/input");
		FileInputFormat.addInputPath(job, in);
		
		TableMapReduceUtil.initTableReducerJob("tab1", HbaseReducer.class, job);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

 

解析的classRecordParser

import org.apache.hadoop.io.Text;

public class RecordParser {
	
	private String id;
	private String phone;
	private int bloodPressure;
	
	public void parse(String record) {
		String[] logs = record.split(",");
		id = logs[1];
		phone = logs[3];
		bloodPressure = Integer.parseInt(logs[4]);
	}
	
	public void parse(Text record) {
		this.parse(record.toString());
	}

	public String getId() {
		return id;
	}

	public String getPhone() {
		return phone;
	}

	public int getBloodPressure() {
		return bloodPressure;
	}
}
 

www.htsjk.Com true http://www.htsjk.com/teradata/28955.html NewsArticle 使用MapReduce往Hbase插入数据, import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.client.Put;impor...
相关文章
    暂无相关文章
评论暂时关闭