hive导入数据到hbase,hive导入数据hbase
背景
hive有一张表user_tag_detail,表中数据约1.3亿,需要将改表数据导入到hbase
尝试了两种方式
- 新建hive表hbase_user_tag_detail,关联hbase表user_tag_detail,将user_tag_detail数据导入hbase_user_tag_detail
- 生成hfile,使用HBaseBulkLoad导入 官网文档 https://cwiki.apache.org/confluence/display/Hive/HBaseBulkLoad#HBaseBulkLoad-FollowupsNeeded
方式一
创建关联表
create table hbase_user_tag_detail(id string,name string ....)
stored by'org.apache.hadoop.hive.hbase.HBaseStorageHandler' with serdeproperties("hbase.columns.mapping" = ":key,cf:id,cf:name) tblproperties ("hbase.table.name" = "user_tag_detail") ;
插入数据
insert overwrite table hbase_user_tag_info select * from user_tag_info;
方式二
脚本如下
#!/bin/bash
#需要导数据的表
hive_table=user_info
#主键
rowkey=id
#hfile的存放路径
hfile_path=/user/test/
echo "##################################[step 1 generate splites]#####################################"
/etc/alternatives/beeline -u 'jdbc:hive2://dn1.hadoop.pdbd.prod:10000' -nhive -phive -e "
use test;
CREATE EXTERNAL TABLE IF NOT EXISTS hbase_splits(partition STRING, count int)
PARTITIONED BY (table STRING);
create temporary function row_sequence as 'org.apache.hadoop.hive.contrib.udf.UDFRowSequence';
INSERT OVERWRITE TABLE hbase_splits
PARTITION (table='${hive_table}')
select ${rowkey},row_sequence() from (
select
${rowkey},
row_sequence() as row
from (
select
${rowkey}
from ${hive_table} tablesample(bucket 1 out of 1 on ${rowkey}) s order by ${rowkey}
) t order by ${rowkey}
) x where (row % 1)=0 order by ${rowkey} ;
CREATE EXTERNAL TABLE IF NOT EXISTS hbase_splits_file(partition STRING)
PARTITIONED BY (table STRING)
ROW FORMAT
SERDE 'org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveNullValueSequenceFileOutputFormat'
LOCATION '/user/test/hbase_splits_file';
INSERT OVERWRITE TABLE hbase_splits_file
PARTITION (table='${hive_table}')
select partition from hbase_splits where table='${hive_table}';
"
echo "##################################[step 2 create hfile table ]#####################################"
echo "DEBUG: table name is: "${hive_table}
sql_select_col="CREATE EXTERNAL TABLE IF NOT EXISTS hbase_${hive_table}("
desc_table_cols=$(/etc/alternatives/beeline -u 'jdbc:hive2://dn1.hadoop.pdbd.prod:10000' -nhive -phive -e "
use test;
desc ${hive_table};
")
temp_file=`mktemp -u temp.user.XXXXXX.$$`
echo "$desc_table_cols" > ${temp_file}
while read line
do
if [[ ${line} =~ "string" ]] || [[ ${line} =~ "int" ]]
then
col_name=$(echo "${line}"|awk -F ' ' '{print $2}')
col_type=$(echo "${line}"|awk -F ' ' '{print $4}')
echo "DEBUG:col_name:"${col_name}
echo "DEBUG:col_type:"${col_type}
sql_select_col="${sql_select_col}${col_name} ${col_type},";
fi
done < ${temp_file}
rm -rf ${temp_file}
len=$(expr length "${sql_select_col}")
let "len = len - 1"
sql_select_col=$(echo ${sql_select_col}|cut -c1-$len)
sql_select_col=${sql_select_col}") STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.hbase.HiveHFileOutputFormat' TBLPROPERTIES('hfile.family.path' = '/user/test/hbsort/cf');"
echo "DEBUG: cols:"${sql_select_col}
/etc/alternatives/beeline -u 'jdbc:hive2://dn1.hadoop.pdbd.prod:10000' -nhive -phive -e "use test;
${sql_select_col};
"
echo "##################################[step 3 create hfile ]#####################################"
task_num=$(
/etc/alternatives/beeline -u 'jdbc:hive2://dn1.hadoop.pdbd.prod:10000' -nhive -phive -e "
use test;
select max(count) + 1 from hbase_splits where table='${hive_table}';
"
)
task_num_str=$(echo ${task_num})
num=$(echo "${task_num_str}" | awk '{print $7}')
echo ${num}
/etc/alternatives/beeline -u 'jdbc:hive2://dn1.hadoop.pdbd.prod:10000' -nhive -phive -e "
USE test;
SET mapred.reduce.tasks=${num};
SET total.order.partitioner.path=/user/test/hbase_splits_file;
SET hive.mapred.partitioner=org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
INSERT OVERWRITE TABLE hbase_${hive_table}
SELECT * FROM ${hive_table} CLUSTER BY ${rowkey};
"
status=$?
echo status=${status}
if [ ${status} -eq 0 ];
then
echo "##################################[step 4 create hbase table ]#####################################"
#create 'testtable', { NAME => 'colfam1', COMPRESSION => 'GZ' }
echo "create '${hive_table}', {NAME => 'f', COMPRESSION => 'GZ'}" | hbase shell
echo "##################################[step 5 move hfile to hbase ]#####################################"
hadoop jar /opt/cloudera/parcels/CDH-5.15.0-1.cdh5.15.0.p0.21/jars/hbase-server-1.2.0-cdh5.15.0.jar completebulkload -Dhbase.zookeeper.quorum=10.1.39.99 -Dhbase.zookeeper.property.clientPort=2181 /user/test/hbsort ${hive_table}
echo "##################################[step 6 test ]#####################################"
echo "scan '${hive_table}', { LIMIT => 1 }" | hbase shell
else
echo "ERROR:@@@@@@ generate hfile error @@@@@@";
exit -1;
fi
- 执行hadoop jar 的时候有一堆的类找不到 ,需要将hbase下的jar包添加到hadoop_classpath下 , export hadoop_classpath=/hbase/…jar
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。