欢迎投稿

今日深度:

《hbase学习》-02-程序批量put数据到Hbase,《hbase学习》-02-

《hbase学习》-02-程序批量put数据到Hbase,《hbase学习》-02-


1.在hbase中建立表格

create 'test_lcc_mycase','case_lizu'

2。编写生成测试数据的代码

package sparksql.test.domain;

public class Mycase {
    private String c_code  ;
    private String c_rcode ;
    private String c_region;
    private String c_cate  ;
    private String c_start ;
    private String c_end   ;
    private long c_start_m ;
    private long c_end_m ;
    private String c_name  ;
    private String c_mark  ;



    @Override
    public String toString() {
        return  c_code + ","
                + c_rcode + "," 
                + c_region + ","
                + c_cate+ "," 
                + c_start + ","
                + c_end + "," 
                + c_start_m + "," 
                + c_end_m + "," 
                + c_name + ","
                + c_mark + "\r\n";
    }
    public String getC_code() {
        return c_code;
    }
    public void setC_code(String c_code) {
        this.c_code = c_code;
    }
    public String getC_rcode() {
        return c_rcode;
    }
    public void setC_rcode(String c_rcode) {
        this.c_rcode = c_rcode;
    }
    public String getC_region() {
        return c_region;
    }
    public void setC_region(String c_region) {
        this.c_region = c_region;
    }
    public String getC_cate() {
        return c_cate;
    }
    public void setC_cate(String c_cate) {
        this.c_cate = c_cate;
    }
    public String getC_start() {
        return c_start;
    }
    public void setC_start(String c_start) {
        this.c_start = c_start;
    }
    public String getC_end() {
        return c_end;
    }
    public void setC_end(String c_end) {
        this.c_end = c_end;
    }
    public long getC_start_m() {
        return c_start_m;
    }
    public void setC_start_m(long c_start_m) {
        this.c_start_m = c_start_m;
    }
    public long getC_end_m() {
        return c_end_m;
    }
    public void setC_end_m(long c_end_m) {
        this.c_end_m = c_end_m;
    }
    public String getC_name() {
        return c_name;
    }
    public void setC_name(String c_name) {
        this.c_name = c_name;
    }
    public String getC_mark() {
        return c_mark;
    }
    public void setC_mark(String c_mark) {
        this.c_mark = c_mark;
    }
}
package sparksql.test.files;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;

import sparksql.test.domain.Mycase;





public class MyCaseFile {

     public static void main(String[] args) throws ParseException, IOException {  
            // TODO Auto-generated method stub  
            // file(内存)----输入流---->【程序】----输出流---->file(内存)  


            for(int k=1;k<=100;k++){
                String fileName =  "mycase"+k+".txt";
                File file = new File("E:/temp",fileName );  

                file.createNewFile(); // 创建文件  


                FileOutputStream in = new FileOutputStream(file);  

                OutputStreamWriter osw = new OutputStreamWriter(in, "UTF-8"); 



                Mycase mycase = new Mycase();
                Random r = new Random();


                long lt = 0;
                Date datetwo = null;
                String start;
                String end;
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss");


                byte bt[] = new byte[1024];  

                Long start_m = Long.parseLong("1150992000000");
                /*
                 * 1000 0000
                 * 
                 */
                //案件数量
                for(int i=1000000*k-1000000;i<1000000*k;i++){
                     mycase.setC_code("A"+i);

                     //生成[0,10)区间的整数
                     //假设有13个区  
                     int qu = r.nextInt(9);

                     mycase.setC_rcode(qu+"");
                     switch (qu) {
                     case 1:
                         mycase.setC_region("杭州市上城区");
                         break;
                     case 2:
                         mycase.setC_region("杭州市下城区");
                         break;
                     case 3:
                         mycase.setC_region("杭州市拱墅区");
                         break;
                     case 4:
                         mycase.setC_region("杭州市江干区");
                         break;
                     case 5:
                         mycase.setC_region("杭州市西湖区");
                         break;
                     case 6:
                         mycase.setC_region("杭州市滨江区");
                         break;
                     case 7:
                         mycase.setC_region("杭州市萧山区");
                         break;
                     case 8:
                         mycase.setC_region("杭州市余杭区");
                         break;
                     case 0:
                         mycase.setC_region("杭州市其他区");
                         break;
                     default:
                         mycase.setC_region("杭州市其他区");
                         System.out.println(qu+"没有对应的区");
                     }


                     //假设有3个案件类别
                     int c_cate = r.nextInt(4);


                     switch (c_cate) {
                     case 0:
                         mycase.setC_cate("刑事案件");
                         break;
                     case 1:
                         mycase.setC_cate("盗窃案件");
                         break;
                     case 2:
                         mycase.setC_cate("强奸案件");
                         break; 
                     case 3:
                         mycase.setC_cate("杀人案件");
                         break;
                     default:
                         System.out.println(c_cate+"没有对应的案件类别");
                     }



                    int day = r.nextInt(5);
                    int our = r.nextInt(24);

                    start_m = start_m +86400000*day;
                    String shijiancuo = start_m+"";
                    lt = new Long(shijiancuo);
                    datetwo = new Date(lt);
                    start = simpleDateFormat.format(datetwo);

                    Long end_m = start_m +3600000*our;
                    String shijiancuo2 = end_m+"";
                    lt = new Long(shijiancuo2);
                    datetwo = new Date(lt);
                    end = simpleDateFormat.format(datetwo);

                   mycase.setC_start(start);
                   mycase.setC_end(end);
                   mycase.setC_start_m(start_m);
                   mycase.setC_end_m(end_m);

                   mycase.setC_name("案件名称"+i);
                   mycase.setC_mark("暂无");


                   // 向文件写入内容(输出流)  
                    String str = mycase.toString() ;  

                    osw.write(str); 
                    osw.flush(); 

                   /* bt = str.getBytes();  

                    in.write(bt, 0, bt.length);  */

                }

                in.close(); 
            }

     }

}

3。生成测试数据

A0,6,杭州市滨江区,盗窃案件,2006/06/23 00:00:00,2006/06/23 10:00:00,1150992000000,1151028000000,案件名称0,暂无
A1,3,杭州市拱墅区,盗窃案件,2006/06/24 00:00:00,2006/06/24 07:00:00,1151078400000,1151103600000,案件名称1,暂无
A2,2,杭州市下城区,盗窃案件,2006/06/27 00:00:00,2006/06/27 09:00:00,1151337600000,1151370000000,案件名称2,暂无
A3,3,杭州市拱墅区,盗窃案件,2006/07/01 00:00:00,2006/07/01 07:00:00,1151683200000,1151708400000,案件名称3,暂无
A4,0,杭州市其他区,强奸案件,2006/07/05 00:00:00,2006/07/05 01:00:00,1152028800000,1152032400000,案件名称4,暂无
A5,4,杭州市江干区,杀人案件,2006/07/08 00:00:00,2006/07/08 23:00:00,1152288000000,1152370800000,案件名称5,暂无
A6,7,杭州市萧山区,盗窃案件,2006/07/11 00:00:00,2006/07/11 23:00:00,1152547200000,1152630000000,案件名称6,暂无
A7,6,杭州市滨江区,刑事案件,2006/07/13 00:00:00,2006/07/13 22:00:00,1152720000000,1152799200000,案件名称7,暂无
A8,2,杭州市下城区,杀人案件,2006/07/13 00:00:00,2006/07/13 22:00:00,1152720000000,1152799200000,案件名称8,暂无
A9,1,杭州市上城区,杀人案件,2006/07/15 00:00:00,2006/07/15 21:00:00,1152892800000,1152968400000,案件名称9,暂无

4。记得所有步骤全是UTF-8不然会出现乱码问题
5。编写上传代码

package sparksql.hbase.fileToHbase;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

public class MycaseUpToHbase {

    public static void main(String[] args) throws IOException  {  
        Configuration configuration = HBaseConfiguration.create();   

        configuration.set("hbase.zookeeper.property.clientPort", "2181");  
        configuration.set("hbase.zookeeper.quorum", "192.168.10.82");  
        configuration.set("hbase.client.write.buffer", "2097152");      

        //默认connection实现是org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation  
        Connection connection = ConnectionFactory.createConnection(configuration);      
        //默认table实现是org.apache.hadoop.hbase.client.HTable  
        Table table = connection.getTable(TableName.valueOf("test_lcc_mycase"));   

        //3177不是我杜撰的,是2*hbase.client.write.buffer/put.heapSize()计算出来的   
        int bestBathPutSize = 3177;     

        try {      
          // Use the table as needed, for a single operation and a single thread      
          // construct List<Put> putLists      
            List<Put> putLists = new ArrayList<Put>();    
            for(int k=1;k<=100;k++){
                String fileName =  "mycase"+k+".txt";

            String filePath = "E:/temp/data1000000-10000000-10000000/"+fileName;
            File file = new File(filePath);
            BufferedReader reader = null;
            System.out.println("以行为单位读取文件内容,一次读一整行:");

            InputStreamReader insReader = new InputStreamReader( new FileInputStream(file), "UTF-8");  
            reader = new BufferedReader(insReader);

            String tempString = null;
            int line = 1;
            // 一次读入一行,直到读入null为文件结束
            while ((tempString = reader.readLine()) != null) {
                // 显示行号
                System.out.println("line " + line + ": " + tempString);
                 String[] array =  tempString.split(",");

                 String c_code  = array[0];
                 String c_rcode   = array[1];
                 String c_region  = array[2];
                 String c_cate    = array[3];
                 String c_start   = array[4];
                 String c_end     = array[5];
                 String c_start_m   = array[6];
                 String c_end_m   = array[7];
                 String c_name    = array[8];
                 String c_mark    = array[9];

                Put put=new Put(Bytes.toBytes(c_code));  
                //Put put = new Put(rowkey.getBytes());    
                put.addImmutable("case_lizu".getBytes(), "c_code".getBytes(), c_code.getBytes("UTF-8"));    
                put.addImmutable("case_lizu".getBytes(), "c_rcode".getBytes(), c_rcode.getBytes("UTF-8"));    
                put.addImmutable("case_lizu".getBytes(), "c_region".getBytes(), c_region.getBytes("UTF-8"));    
                put.addImmutable("case_lizu".getBytes(), "c_cate".getBytes(), c_cate.getBytes("UTF-8"));    
                put.addImmutable("case_lizu".getBytes(), "c_start".getBytes(), c_start.getBytes("UTF-8"));    
                put.addImmutable("case_lizu".getBytes(), "c_end".getBytes(), c_end.getBytes("UTF-8"));    
                put.addImmutable("case_lizu".getBytes(), "c_start_m".getBytes(), c_start_m.getBytes("UTF-8"));    
                put.addImmutable("case_lizu".getBytes(), "c_end_m".getBytes(), c_end_m.getBytes("UTF-8"));    
                put.addImmutable("case_lizu".getBytes(), "c_name".getBytes(), c_name.getBytes("UTF-8"));    
                put.addImmutable("case_lizu".getBytes(), "c_mark".getBytes(), c_mark.getBytes("UTF-8"));    

   //这里面c_mark.getBytes("UTF-8") 这个指定utf-8很重要,不然会乱码

                put.setDurability(Durability.SKIP_WAL);  
                putLists.add(put);    

                if(putLists.size()==bestBathPutSize){    
                  //达到最佳大小值了,马上提交一把

                    table.put(putLists);    
                    putLists.clear();    
                }    

                line++;
            }
            reader.close();



          //剩下的未提交数据,最后做一次提交    
          table.put(putLists) ;     

            }
        } finally {      
          table.close();      
          connection.close();      
        } 
    }



}

6。执行上传代码,进行编写scala读取操作

package sparlsql.hbase;

import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.{TableName, HBaseConfiguration}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}

import java.util.Date  

object SparkSQLOnHbase {



  def main(args: Array[String]): Unit = {


         val starttime=System.nanoTime   


        // 本地模式运行,便于测试
        val sparkConf = new SparkConf().setMaster("local").setAppName("HBaseTest")

        // 创建hbase configuration
        val hBaseConf = HBaseConfiguration.create()
        hBaseConf.set("hbase.zookeeper.property.clientPort", "2181");  
        hBaseConf.set("hbase.zookeeper.quorum", "192.168.10.82"); 
        //var con = ConnectionFactory.createConnection(hBaseConf)

        //var table = con.getTable(TableName.valueOf(""))

       hBaseConf.set(TableInputFormat.INPUT_TABLE,"test_lcc_mycase")

        // 创建 spark context
        val sc = new SparkContext(sparkConf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._

        // 从数据源获取数据
        var hbaseRDD = sc.newAPIHadoopRDD(hBaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])

        // 将数据映射为表  也就是将 RDD转化为 dataframe schema
        val mycase = hbaseRDD.map(r=>(
            Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_code"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_rcode"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_region"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_cate"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_start"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_end"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_start_m"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_end_m"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_name"))),
            Bytes.toString(r._2.getValue(Bytes.toBytes("case_lizu"),Bytes.toBytes("c_mark")))

        )).toDF("c_code","c_rcode","c_region","c_cate","c_start","c_end","c_start_m","c_end_m","c_name","c_mark")

        mycase.registerTempTable("mycase")


        // 测试
        val df5 = sqlContext.sql("select * from  mycase  ")
        df5.show()
        println(df5.count())
       // df5.collect().foreach(print(_))




        val endtime=System.nanoTime  
        val delta=endtime-starttime  
        println(delta/1000000d)  

  }

}

7.打印结果

+------+-------+--------+------+-------------------+-------------------+-------------+-------------+--------+------+
|c_code|c_rcode|c_region|c_cate|            c_start|              c_end|    c_start_m|      c_end_m|  c_name|c_mark|
+------+-------+--------+------+-------------------+-------------------+-------------+-------------+--------+------+
|    A0|      5|  杭州市西湖区|  强奸案件|2006/06/24 00:00:00|2006/06/24 13:00:00|1151078400000|1151125200000|   案件名称0|    暂无|
|    A1|      1|  杭州市上城区|  强奸案件|2006/06/25 00:00:00|2006/06/25 22:00:00|1151164800000|1151244000000|   案件名称1|    暂无|
|   A10|      6|  杭州市滨江区|  刑事案件|2006/07/06 00:00:00|2006/07/06 20:00:00|1152115200000|1152187200000|  案件名称10|    暂无|
|  A100|      1|  杭州市上城区|  杀人案件|2006/12/31 00:00:00|2006/12/31 10:00:00|1167494400000|1167530400000| 案件名称100|    暂无|
| A1000|      6|  杭州市滨江区|  强奸案件|2011/12/25 00:00:00|2011/12/25 03:00:00|1324742400000|1324753200000|案件名称1000|    暂无|
| A1001|      4|  杭州市江干区|  强奸案件|2011/12/29 00:00:00|2011/12/29 03:00:00|1325088000000|1325098800000|案件名称1001|    暂无|
| A1002|      8|  杭州市余杭区|  盗窃案件|2011/12/30 00:00:00|2011/12/30 09:00:00|1325174400000|1325206800000|案件名称1002|    暂无|
| A1003|      1|  杭州市上城区|  刑事案件|2011/12/31 00:00:00|2011/12/31 08:00:00|1325260800000|1325289600000|案件名称1003|    暂无|
| A1004|      4|  杭州市江干区|  强奸案件|2011/12/31 00:00:00|2011/12/31 22:00:00|1325260800000|1325340000000|案件名称1004|    暂无|
| A1005|      7|  杭州市萧山区|  刑事案件|2012/01/04 00:00:00|2012/01/04 08:00:00|1325606400000|1325635200000|案件名称1005|    暂无|
| A1006|      2|  杭州市下城区|  杀人案件|2012/01/06 00:00:00|2012/01/06 20:00:00|1325779200000|1325851200000|案件名称1006|    暂无|
| A1007|      4|  杭州市江干区|  刑事案件|2012/01/07 00:00:00|2012/01/07 02:00:00|1325865600000|1325872800000|案件名称1007|    暂无|
| A1008|      1|  杭州市上城区|  强奸案件|2012/01/09 00:00:00|2012/01/09 23:00:00|1326038400000|1326121200000|案件名称1008|    暂无|
| A1009|      3|  杭州市拱墅区|  强奸案件|2012/01/09 00:00:00|2012/01/09 15:00:00|1326038400000|1326092400000|案件名称1009|    暂无|
|  A101|      0|  杭州市其他区|  刑事案件|2007/01/02 00:00:00|2007/01/02 05:00:00|1167667200000|1167685200000| 案件名称101|    暂无|
| A1010|      1|  杭州市上城区|  强奸案件|2012/01/11 00:00:00|2012/01/11 14:00:00|1326211200000|1326261600000|案件名称1010|    暂无|
| A1011|      7|  杭州市萧山区|  盗窃案件|2012/01/15 00:00:00|2012/01/15 12:00:00|1326556800000|1326600000000|案件名称1011|    暂无|
| A1012|      3|  杭州市拱墅区|  刑事案件|2012/01/15 00:00:00|2012/01/15 11:00:00|1326556800000|1326596400000|案件名称1012|    暂无|
| A1013|      0|  杭州市其他区|  强奸案件|2012/01/17 00:00:00|2012/01/17 06:00:00|1326729600000|1326751200000|案件名称1013|    暂无|
| A1014|      6|  杭州市滨江区|  刑事案件|2012/01/18 00:00:00|2012/01/18 15:00:00|1326816000000|1326870000000|案件名称1014|    暂无|
+------+-------+--------+------+-------------------+-------------------+-------------+-------------+--------+------+

www.htsjk.Com true http://www.htsjk.com/hbase/28445.html NewsArticle 《hbase学习》-02-程序批量put数据到Hbase,《hbase学习》-02- 1.在hbase中建立表格 create 'test_lcc_mycase' , 'case_lizu' 2。编写生成测试数据的代码 package sparksql.test.domain; public class Mycase { private St...
相关文章
    暂无相关文章
评论暂时关闭