《hbase学习》-02-程序批量put数据到Hbase,《hbase学习》-02-
1.在hbase中建立表格
create 'test_lcc_mycase','case_lizu'
2。编写生成测试数据的代码
package sparksql.test.domain;
public class Mycase {
private String c_code ;
private String c_rcode ;
private String c_region;
private String c_cate ;
private String c_start ;
private String c_end ;
private long c_start_m ;
private long c_end_m ;
private String c_name ;
private String c_mark ;
@Override
public String toString() {
return c_code + ","
+ c_rcode + ","
+ c_region + ","
+ c_cate+ ","
+ c_start + ","
+ c_end + ","
+ c_start_m + ","
+ c_end_m + ","
+ c_name + ","
+ c_mark + "\r\n";
}
public String getC_code() {
return c_code;
}
public void setC_code(String c_code) {
this.c_code = c_code;
}
public String getC_rcode() {
return c_rcode;
}
public void setC_rcode(String c_rcode) {
this.c_rcode = c_rcode;
}
public String getC_region() {
return c_region;
}
public void setC_region(String c_region) {
this.c_region = c_region;
}
public String getC_cate() {
return c_cate;
}
public void setC_cate(String c_cate) {
this.c_cate = c_cate;
}
public String getC_start() {
return c_start;
}
public void setC_start(String c_start) {
this.c_start = c_start;
}
public String getC_end() {
return c_end;
}
public void setC_end(String c_end) {
this.c_end = c_end;
}
public long getC_start_m() {
return c_start_m;
}
public void setC_start_m(long c_start_m) {
this.c_start_m = c_start_m;
}
public long getC_end_m() {
return c_end_m;
}
public void setC_end_m(long c_end_m) {
this.c_end_m = c_end_m;
}
public String getC_name() {
return c_name;
}
public void setC_name(String c_name) {
this.c_name = c_name;
}
public String getC_mark() {
return c_mark;
}
public void setC_mark(String c_mark) {
this.c_mark = c_mark;
}
}
package sparksql.test.files;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import sparksql.test.domain.Mycase;
public class MyCaseFile {
public static void main(String[] args) throws ParseException, IOException {
// TODO Auto-generated method stub
// file(内存)----输入流---->【程序】----输出流---->file(内存)
for(int k=1;k<=100;k++){
String fileName = "mycase"+k+".txt";
File file = new File("E:/temp",fileName );
file.createNewFile(); // 创建文件
FileOutputStream in = new FileOutputStream(file);
OutputStreamWriter osw = new OutputStreamWriter(in, "UTF-8");
Mycase mycase = new Mycase();
Random r = new Random();
long lt = 0;
Date datetwo = null;
String start;
String end;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");
byte bt[] = new byte[1024];
Long start_m = Long.parseLong("1150992000000");
/*
* 1000 0000
*
*/
//案件数量
for(int i=1000000*k-1000000;i<1000000*k;i++){
mycase.setC_code("A"+i);
//生成[0,10)区间的整数
//假设有13个区
int qu = r.nextInt(9);
mycase.setC_rcode(qu+"");
switch (qu) {
case 1:
mycase.setC_region("杭州市上城区");
break;
case 2:
mycase.setC_region("杭州市下城区");
break;
case 3:
mycase.setC_region("杭州市拱墅区");
break;
case 4:
mycase.setC_region("杭州市江干区");
break;
case 5:
mycase.setC_region("杭州市西湖区");
break;
case 6:
mycase.setC_region("杭州市滨江区");
break;
case 7:
mycase.setC_region("杭州市萧山区");
break;
case 8:
mycase.setC_region("杭州市余杭区");
break;
case 0:
mycase.setC_region("杭州市其他区");
break;
default:
mycase.setC_region("杭州市其他区");
System.out.println(qu+"没有对应的区");
}
//假设有3个案件类别
int c_cate = r.nextInt(4);
switch (c_cate) {
case 0:
mycase.setC_cate("刑事案件");
break;
case 1:
mycase.setC_cate("盗窃案件");
break;
case 2:
mycase.setC_cate("强奸案件");
break;
case 3:
mycase.setC_cate("杀人案件");
break;
default:
System.out.println(c_cate+"没有对应的案件类别");
}
int day = r.nextInt(5);
int our = r.nextInt(24);
start_m = start_m +86400000*day;
String shijiancuo = start_m+"";
lt = new Long(shijiancuo);
datetwo = new Date(lt);
start = simpleDateFormat.format(datetwo);
Long end_m = start_m +3600000*our;
String shijiancuo2 = end_m+"";
lt = new Long(shijiancuo2);
datetwo = new Date(lt);
end = simpleDateFormat.format(datetwo);
mycase.setC_start(start);
mycase.setC_end(end);
mycase.setC_start_m(start_m);
mycase.setC_end_m(end_m);
mycase.setC_name("案件名称"+i);
mycase.setC_mark("暂无");
// 向文件写入内容(输出流)
String str = mycase.toString() ;
osw.write(str);
osw.flush();
/* bt = str.getBytes();
in.write(bt, 0, bt.length); */
}
in.close();
}
}
}
3。生成测试数据
A0,6,杭州市滨江区,盗窃案件,2006/06/23 00:00:00,2006/06/23 10:00:00,1150992000000,1151028000000,案件名称0,暂无
A1,3,杭州市拱墅区,盗窃案件,2006/06/24 00:00:00,2006/06/24 07:00:00,1151078400000,1151103600000,案件名称1,暂无
A2,2,杭州市下城区,盗窃案件,2006/06/27 00:00:00,2006/06/27 09:00:00,1151337600000,1151370000000,案件名称2,暂无
A3,3,杭州市拱墅区,盗窃案件,2006/07/01 00:00:00,2006/07/01 07:00:00,1151683200000,1151708400000,案件名称3,暂无
A4,0,杭州市其他区,强奸案件,2006/07/05 00:00:00,2006/07/05 01:00:00,1152028800000,1152032400000,案件名称4,暂无
A5,4,杭州市江干区,杀人案件,2006/07/08 00:00:00,2006/07/08 23:00:00,1152288000000,1152370800000,案件名称5,暂无
A6,7,杭州市萧山区,盗窃案件,2006/07/11 00:00:00,2006/07/11 23:00:00,1152547200000,1152630000000,案件名称6,暂无
A7,6,杭州市滨江区,刑事案件,2006/07/13 00:00:00,2006/07/13 22:00:00,1152720000000,1152799200000,案件名称7,暂无
A8,2,杭州市下城区,杀人案件,2006/07/13 00:00:00,2006/07/13 22:00:00,1152720000000,1152799200000,案件名称8,暂无
A9,1,杭州市上城区,杀人案件,2006/07/15 00:00:00,2006/07/15 21:00:00,1152892800000,1152968400000,案件名称9,暂无
4。记得所有步骤全是UTF-8不然会出现乱码问题
5。编写上传代码
package sparksql.hbase.fileToHbase;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
public class MycaseUpToHbase {
public static void main(String[] args) throws IOException {
Configuration configuration = HBaseConfiguration.create();
configuration.set("hbase.zookeeper.property.clientPort", "2181");
configuration.set("hbase.zookeeper.quorum", "192.168.10.82");
configuration.set("hbase.client.write.buffer", "2097152");
//默认connection实现是org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation
Connection connection = ConnectionFactory.createConnection(configuration);
//默认table实现是org.apache.hadoop.hbase.client.HTable
Table table = connection.getTable(TableName.valueOf("test_lcc_mycase"));
//3177不是我杜撰的,是2*hbase.client.write.buffer/put.heapSize()计算出来的
int bestBathPutSize = 3177;
try {
// Use the table as needed, for a single operation and a single thread
// construct List<Put> putLists
List<Put> putLists = new ArrayList<Put>();
for(int k=1;k<=100;k++){
String fileName = "mycase"+k+".txt";
String filePath = "E:/temp/data1000000-10000000-10000000/"+fileName;
File file = new File(filePath);
BufferedReader reader = null;
System.out.println("以行为单位读取文件内容,一次读一整行:");
InputStreamReader insReader = new InputStreamReader( new FileInputStream(file), "UTF-8");
reader = new BufferedReader(insReader);
String tempString = null;
int line = 1;
// 一次读入一行,直到读入null为文件结束
while ((tempString = reader.readLine()) != null) {
// 显示行号
System.out.println("line " + line + ": " + tempString);
String[] array = tempString.split(",");
String c_code = array[0];
String c_rcode = array[1];
String c_region = array[2];
String c_cate = array[3];
String c_start = array[4];
String c_end = array[5];
String c_start_m = array[6];
String c_end_m = array[7];
String c_name = array[8];
String c_mark = array[9];
Put put=new Put(Bytes.toBytes(c_code));
//Put put = new Put(rowkey.getBytes());
put.addImmutable("case_lizu".getBytes(), "c_code".getBytes(), c_code.getBytes("UTF-8"));
put.addImmutable("case_lizu".getBytes(), "c_rcode".getBytes(), c_rcode.getBytes("UTF-8"));
put.addImmutable("case_lizu".getBytes(), "c_region".getBytes(), c_region.getBytes("UTF-8"));
put.addImmutable("case_lizu".getBytes(), "c_cate".getBytes(), c_cate.getBytes("UTF-8"));
put.addImmutable("case_lizu".getBytes(), "c_start".getBytes(), c_start.getBytes("UTF-8"));
put.addImmutable("case_lizu".getBytes(), "c_end".getBytes(), c_end.getBytes("UTF-8"));
put.addImmutable("case_lizu".getBytes(), "c_start_m".getBytes(), c_start_m.getBytes("UTF-8"));
put.addImmutable("case_lizu".getBytes(), "c_end_m".getBytes(), c_end_m.getBytes("UTF-8"));
put.addImmutable("case_lizu".getBytes(), "c_name".getBytes(), c_name.getBytes("UTF-8"));
put.addImmutable("case_lizu".getBytes(), "c_mark".getBytes(), c_mark.getBytes("UTF-8"));
//这里面c_mark.getBytes("UTF-8") 这个指定utf-8很重要,不然会乱码
put.setDurability(Durability.SKIP_WAL);
putLists.add(put);
if(putLists.size()==bestBathPutSize){
//达到最佳大小值了,马上提交一把
table.put(putLists);
putLists.clear();
}
line++;
}
reader.close();
//剩下的未提交数据,最后做一次提交
table.put(putLists) ;
}
} finally {
table.close();
connection.close();
}
}
}
6。执行上传代码,进行编写scala读取操作
package sparlsql.hbase;
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
import java.util.Date
object SparkSQLOnHbase {
def main(args: Array[String]): Unit = {
val starttime=System.nanoTime
// 本地模式运行,便于测试
val sparkConf = new SparkConf().setMaster("local").setAppName("HBaseTest")
// 创建hbase configuration
val hBaseConf = HBaseConfiguration.create()
hBaseConf.set("hbase.zookeeper.property.clientPort", "2181");
hBaseConf.set("hbase.zookeeper.quorum", "192.168.10.82");
//var con = ConnectionFactory.createConnection(hBaseConf)
//var table = con.getTable(TableName.valueOf(""))
hBaseConf.set(TableInputFormat.INPUT_TABLE,"test_lcc_mycase")
// 创建 spark context
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// 从数据源获取数据
var hbaseRDD = sc.newAPIHadoopRDD(hBaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
// 将数据映射为表 也就是将 RDD转化为 dataframe schema
val mycase = hbaseRDD.map(r=>(
Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_code"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_rcode"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_region"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_cate"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_start"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_end"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_start_m"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_end_m"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_name"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_mark")))
)).toDF("c_code","c_rcode","c_region","c_cate","c_start","c_end","c_start_m","c_end_m","c_name","c_mark")
mycase.registerTempTable("mycase")
// 测试
val df5 = sqlContext.sql("select * from mycase ")
df5.show()
println(df5.count())
// df5.collect().foreach(print(_))
val endtime=System.nanoTime
val delta=endtime-starttime
println(delta/1000000d)
}
}
7.打印结果
+------+-------+--------+------+-------------------+-------------------+-------------+-------------+--------+------+
|c_code|c_rcode|c_region|c_cate| c_start| c_end| c_start_m| c_end_m| c_name|c_mark|
+------+-------+--------+------+-------------------+-------------------+-------------+-------------+--------+------+
| A0| 5| 杭州市西湖区| 强奸案件|2006/06/24 00:00:00|2006/06/24 13:00:00|1151078400000|1151125200000| 案件名称0| 暂无|
| A1| 1| 杭州市上城区| 强奸案件|2006/06/25 00:00:00|2006/06/25 22:00:00|1151164800000|1151244000000| 案件名称1| 暂无|
| A10| 6| 杭州市滨江区| 刑事案件|2006/07/06 00:00:00|2006/07/06 20:00:00|1152115200000|1152187200000| 案件名称10| 暂无|
| A100| 1| 杭州市上城区| 杀人案件|2006/12/31 00:00:00|2006/12/31 10:00:00|1167494400000|1167530400000| 案件名称100| 暂无|
| A1000| 6| 杭州市滨江区| 强奸案件|2011/12/25 00:00:00|2011/12/25 03:00:00|1324742400000|1324753200000|案件名称1000| 暂无|
| A1001| 4| 杭州市江干区| 强奸案件|2011/12/29 00:00:00|2011/12/29 03:00:00|1325088000000|1325098800000|案件名称1001| 暂无|
| A1002| 8| 杭州市余杭区| 盗窃案件|2011/12/30 00:00:00|2011/12/30 09:00:00|1325174400000|1325206800000|案件名称1002| 暂无|
| A1003| 1| 杭州市上城区| 刑事案件|2011/12/31 00:00:00|2011/12/31 08:00:00|1325260800000|1325289600000|案件名称1003| 暂无|
| A1004| 4| 杭州市江干区| 强奸案件|2011/12/31 00:00:00|2011/12/31 22:00:00|1325260800000|1325340000000|案件名称1004| 暂无|
| A1005| 7| 杭州市萧山区| 刑事案件|2012/01/04 00:00:00|2012/01/04 08:00:00|1325606400000|1325635200000|案件名称1005| 暂无|
| A1006| 2| 杭州市下城区| 杀人案件|2012/01/06 00:00:00|2012/01/06 20:00:00|1325779200000|1325851200000|案件名称1006| 暂无|
| A1007| 4| 杭州市江干区| 刑事案件|2012/01/07 00:00:00|2012/01/07 02:00:00|1325865600000|1325872800000|案件名称1007| 暂无|
| A1008| 1| 杭州市上城区| 强奸案件|2012/01/09 00:00:00|2012/01/09 23:00:00|1326038400000|1326121200000|案件名称1008| 暂无|
| A1009| 3| 杭州市拱墅区| 强奸案件|2012/01/09 00:00:00|2012/01/09 15:00:00|1326038400000|1326092400000|案件名称1009| 暂无|
| A101| 0| 杭州市其他区| 刑事案件|2007/01/02 00:00:00|2007/01/02 05:00:00|1167667200000|1167685200000| 案件名称101| 暂无|
| A1010| 1| 杭州市上城区| 强奸案件|2012/01/11 00:00:00|2012/01/11 14:00:00|1326211200000|1326261600000|案件名称1010| 暂无|
| A1011| 7| 杭州市萧山区| 盗窃案件|2012/01/15 00:00:00|2012/01/15 12:00:00|1326556800000|1326600000000|案件名称1011| 暂无|
| A1012| 3| 杭州市拱墅区| 刑事案件|2012/01/15 00:00:00|2012/01/15 11:00:00|1326556800000|1326596400000|案件名称1012| 暂无|
| A1013| 0| 杭州市其他区| 强奸案件|2012/01/17 00:00:00|2012/01/17 06:00:00|1326729600000|1326751200000|案件名称1013| 暂无|
| A1014| 6| 杭州市滨江区| 刑事案件|2012/01/18 00:00:00|2012/01/18 15:00:00|1326816000000|1326870000000|案件名称1014| 暂无|
+------+-------+--------+------+-------------------+-------------------+-------------+-------------+--------+------+
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。