欢迎投稿

今日深度:

spark批量导入hbase,sparkhbase

spark批量导入hbase,sparkhbase


方法1

使用hbase put方式,这种方式效率不高

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
 
val conf = HBaseConfiguration.create()
val tableName = "/t1"
conf.set(TableInputFormat.INPUT_TABLE, tableName)
 
val myTable = new HTable(conf, tableName);
var p = new Put();
p = new Put(new String("row999").getBytes());
p.add("cf".getBytes(), "column_name".getBytes(), new String("value999").getBytes());
myTable.put(p);
myTable.flushCommits();

方法2

先生成hfile文件再将hfile文件导入hbase 效率较高

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
 
val conf = HBaseConfiguration.create()
val tableName = "hao"
val table = new HTable(conf, tableName) 
 
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)
 
// Generate 10 sample data:
val num = sc.parallelize(1 to 10)
val rdd = num.map(x=>{
    val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), "c1".getBytes(), "value_xxx".getBytes() )
    (new ImmutableBytesWritable(Bytes.toBytes(x)), kv)
})
 
// Save Hfiles on HDFS 
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], conf)
 
//Bulk load Hfiles to Hbase
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path("/tmp/xxxx19"), table)

方法3

省去方法2的第三步

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
 
val conf = HBaseConfiguration.create()
val tableName = "hao"
val table = new HTable(conf, tableName) 
 
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val job = Job.getInstance(conf)
job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
job.setMapOutputValueClass (classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad (job, table)
 
// Generate 10 sample data:
val num = sc.parallelize(1 to 10)
val rdd = num.map(x=>{
    val kv: KeyValue = new KeyValue(Bytes.toBytes(x), "cf".getBytes(), "c1".getBytes(), "value_xxx".getBytes() )
    (new ImmutableBytesWritable(Bytes.toBytes(x)), kv)
})
 
// Directly bulk load to Hbase/MapRDB tables.
rdd.saveAsNewAPIHadoopFile("/tmp/xxxx19", classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat], job.getConfiguration())


www.htsjk.Com true http://www.htsjk.com/hbase/29107.html NewsArticle spark批量导入hbase,sparkhbase 方法1 使用hbase put方式,这种方式效率不高 import org.apache.spark._import org.apache.spark.rdd.NewHadoopRDDimport org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}import org.ap...
相关文章
    暂无相关文章
评论暂时关闭