欢迎投稿

今日深度:

hive to hbase,

hive to hbase,


-- hive -e 'show create table grades' > table
CREATE TABLE `mydb.grades`(
  `id` int COMMENT 'ID', 
  `name` string COMMENT '姓名', 
  `age` int COMMENT '年龄')
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
  LINES TERMINATED BY '\n' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.RCFileInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.RCFileOutputFormat'
LOCATION
  'hdfs://192.168.253.11:9000/user/root/hive/warehouse/mydb.db/grades'
TBLPROPERTIES (
  'COLUMN_STATS_ACCURATE'='true', 
  'numFiles'='1', 
  'numRows'='0', 
  'rawDataSize'='0', 
  'totalSize'='30', 
  'transient_lastDdlTime'='1457602162')


package edu.wzm.utils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hive.hcatalog.common.HCatUtil;
import org.apache.hive.hcatalog.data.schema.HCatSchema;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.ArrayList;
import java.util.List;

/**
 * Created by GatsbyNewton on 2016/3/24.
 */
public class HiveTableUtils {

    //Gain hive table columns by parsing file.
    public static List<String>  getFieldName(String filePath){
        File file = new File(filePath);
        BufferedReader reader = null;
        List<String> fieldName = new ArrayList<String>();

        try {
            if (file.exists()) {
                reader = new BufferedReader(new FileReader(file));
                String tmp = null;
                while ((tmp = reader.readLine()) != null) {
                    if (tmp.contains("`") && tmp.contains("COMMENT")) {
                        int start = tmp.indexOf("`");
                        int end = tmp.lastIndexOf("`");
                        fieldName.add(tmp.substring(start + 1, end));
                    }
                }
            } else {
                System.err.println("The file doesn't exist!");
                System.exit(1);
            }

            reader.close();
        }
        catch (Exception e) {
            e.printStackTrace();
        }

        return fieldName;
    }

import edu.wzm.transform.RCFileToHFile;
import edu.wzm.utils.HiveTableUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.SimpleTotalOrderPartitioner;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hive.hcatalog.common.HCatUtil;
import org.apache.hive.hcatalog.rcfile.RCFileMapReduceInputFormat;

import java.util.List;

/**
 * Created by GatsbyNewton on 2016/3/24.
 */
public class Driver extends Configured implements Tool{

    private static Configuration conf = new Configuration();
    private static Configuration hconf = null;
    private static HBaseAdmin hadmin = null;

    public static void connectHBase(){
        final String HBASE_CONFIG_ZOOKEEPER_CLIENT = "hbase.zookeeper.property.clientPort";
        final String HBASE_ZOOKEEPER_CLIENT_PORT = "2181";
        final String HBASE_CONFIG_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
        final String HBASE_ZOOKEEPER_SERVER = "hbase38,hbase43,hbase00";

        conf.set(HBASE_CONFIG_ZOOKEEPER_CLIENT, HBASE_ZOOKEEPER_CLIENT_PORT);
        conf.set(HBASE_CONFIG_ZOOKEEPER_QUORUM, HBASE_ZOOKEEPER_SERVER);
        hconf = HBaseConfiguration.create(conf);
        try{
            hadmin = new HBaseAdmin(hconf);
        }
        catch (Exception e){
            e.printStackTrace();
        }
    }


    public static void main(String[] args)throws Exception{
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        if(otherArgs.length != 4){
            System.err.println("Usage: <rcfile> <hfile> <schemafile> <hbasetable>");
            System.exit(1);
        }

        String path = System.getProperty("user.dir") + otherArgs[2];
        List<String> fieldNames = HiveTableUtils.getFieldName(path);
        StringBuilder sb = new StringBuilder(fieldNames.get(0));
        int size = fieldNames.size();
        for(int i = 1; i < size; i++){
            sb.append(":").append(fieldNames.get(i));
        }

        conf.set("schema", sb.toString());
        
        if(ToolRunner.run(conf, new Driver(), otherArgs) == 0){
            // Importing the generated HFiles into a HBase table
            LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
            loader.doBulkLoad(new Path(otherArgs[1], otherArgs[3]);
            System.exit(0);
        }
        else{
            System.exit(1);
        }
    }

    @SuppressWarnings("deprecation")
    @Override
    public int run(String[] strings) throws Exception {

        Configuration config = getConf();
        Driver.connectHBase();

        Job job = new Job(config, "RCFile to HFile");
        job.setJarByClass(Driver.class);
        job.setMapperClass(RCFileToHFile.ParseMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(KeyValue.class);

        //Reduce's number is 0.
        job.setNumReduceTasks(0);

        job.setPartitionerClass(SimpleTotalOrderPartitioner.class);

        job.setInputFormatClass(RCFileMapReduceInputFormat.class);
//        job.setOutputFormatClass(HFileOutputFormat.class);

        HTable table = new HTable(config, strings[3]);
        HFileOutputFormat.configureIncrementalLoad(job, table);

        RCFileMapReduceInputFormat.addInputPath(job, new Path(strings[0]));
        FileOutputFormat.setOutputPath(job, new Path(strings[1]));

        return job.waitForCompletion(true) ? 0 : 1;
    }
}


import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import edu.wzm.utils.HiveTableUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hive.hcatalog.rcfile.RCFileMapReduceInputFormat;

public class RCFileToHFile {
    
    public static class ParseMapper extends Mapper<LongWritable, BytesRefArrayWritable, ImmutableBytesWritable, KeyValue>{
//        private List<String> fieldName = null;
        private String[] fieldName = null;

        @Override
        protected void setup(Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
            super.setup(context);
            Configuration conf = context.getConfiguration();
            
            String schema = conf.get("schema");
            fieldName = schema.split(":");

//            fieldName = new ArrayList<String>();
//            fieldName.add("id");
//            fieldName.add("name");
//            fieldName.add("age");
        }
        
        @Override
        protected void map(LongWritable key, BytesRefArrayWritable values,
                Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub

            Text line = new Text();
            List<String> fields = new ArrayList<String>();
            int size = values.size();
            for(int i = 0; i < size; i++){
                BytesRefWritable value = values.get(i);
                line.set(value.getData(), value.getStart(), value.getLength());
                fields.add(line.toString());
            }
            
            String rowKey = fields.get(0);
            String columnFamily = "cf";
            int length = fieldName.length;
            ImmutableBytesWritable hKey = new ImmutableBytesWritable();
            hKey.set(rowKey.getBytes());
            KeyValue kv = null;
            for(int i = 1; i < length; i++){
                kv = new KeyValue(hKey.get(), columnFamily.getBytes(), fieldName[i].getBytes(), fields.get(i).getBytes());
                context.write(hKey, kv);
            }
            
        }
    }

}
 

www.htsjk.Com true http://www.htsjk.com/hbase/40384.html NewsArticle hive to hbase, -- hive -e 'show create table grades' table CREATE TABLE `mydb.grades`(   `id` int COMMENT 'ID',    `name` string COMMENT '姓名',    `age` int COMMENT '年龄') ROW FORMAT DELIMITED    FIELDS TERMINATED BY ','   ...
相关文章
    暂无相关文章
评论暂时关闭