HBase导入TXT文件,
1.文件导入HDFS创建一个txt测试文件 (列与列之间用 ","分割) vi test.txt
将文件传入HDFS hadoop fs -put /user/training
查看目录下的文件 hadoop fs -ls /user/training 查看文件内容 hadoop fs -cat test.txt
2.在Hbase中新建表
hbase(main):004:0> create 'dbtest','cf'
3.在eclipse中运行下面的代码
import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
public class BatchImport { static Configuration conf = null; static { conf = HBaseConfiguration.create(); } static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{ SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss"); Text v2 = new Text(); protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException { final String[] splited = value.toString().split(","); try { final Date date = new Date(Long.parseLong(splited[0].trim())); final String dateFormat = dateformat1.format(date); String rowKey = splited[1]+":"+dateFormat; v2.set(rowKey+","+value.toString()); context.write(key, v2); } catch (NumberFormatException e) { final Counter counter = context.getCounter("BatchImport", "ErrorFormat"); counter.increment(1L); System.out.println("error"+splited[0]+" "+e.getMessage()); } }; } static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{ protected void reduce(LongWritable key, java.lang.Iterable<Text> values, Context context) throws java.io.IOException ,InterruptedException { for (Text text : values) { final String[] splited = text.toString().split(","); for(int i=0;i<splited.length;i++){ System.out.println(splited[i]); } final Put put = new Put(Bytes.toBytes(splited[0])); put.add(Bytes.toBytes("cf"), Bytes.toBytes("col1"), Bytes.toBytes(splited[1])); put.add(Bytes.toBytes("cf"), Bytes.toBytes("col2"), Bytes.toBytes(splited[2])); put.add(Bytes.toBytes("cf"),Bytes.toBytes("col3"), Bytes.toBytes(splited[3])); context.write(NullWritable.get(), put); } }; } public static void main(String[] args) throws Exception {
conf.set("hbase.zookeeper.quorum", "localhost");
conf.set(TableOutputFormat.OUTPUT_TABLE, "dbtest");
conf.set("dfs.socket.timeout", "180000"); final Job job = new Job(conf, "HBaseBatchImport"); job.setMapperClass(BatchImportMapper.class); job.setReducerClass(BatchImportReducer.class);
job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TableOutputFormat.class); FileInputFormat.setInputPaths(job, "hdfs://localhost:8020/user/training/test_2.txt"); job.waitForCompletion(true); } }
4.查看HBase中数据
hbase(main):004:0> scan 'dbtest'
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。