欢迎投稿

今日深度:

HBase批量写入数据,

HBase批量写入数据,


HBase是一个高可靠性、高性能、列式分布式数据库,HBase基于HDFS来存储并处理大型的数据,没有update,但可以追加合并数据;可以有多个主节点 Hmaster, 可以有多个从节点 Hregionserver, 是N+N关系;特点是写快读慢。在需要实时读写,随机访问超大规模数据集时,可以使用HBase.

一、HBase与行式数据区别:
1、行式数据库是一行一行的查数据,进行匹配;列式数据库是把行式数据库的列转换为了行,没有多表关联,没有事务,只能存海量数据。
2、HBase一张表可能达到上亿行,上百万列;对于为空(null)的列,不占用存储空间,稀疏性很好;面向列(族)的存储和权限控制,针对列(族)进行独立检索。

二、HBase 与HDFS关系
当一条一条数据插入时,HDFS的namenode会吃力,所以先缓存到HBASE(Hmaster/Hregionserver 主从)的Hregionserver内存, 当达到128M,再写到hdfs。

HDFS最适于执行批次分析。然而,它最大的缺点是无法执行实时分析,而实时分析是信息科技行业的标配。HBase它不适于批次分析,但能够处理大规模数据,可以向Hadoop实时地调用数据。

三、HBase单条写入数据

public class HbaseTest {
   /**
     * 配置ss
     */
    static Configuration config = null;
    private Connection connection = null;
    private Table table = null;

	@Before
	public void init() throws Exception {
		config = HBaseConfiguration.create();// 配置
		config.set("hbase.zookeeper.quorum", "master1-vsp.com");// zookeeper地址
		config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口
		connection = ConnectionFactory.createConnection(config);
		table = connection.getTable(TableName.valueOf("test4"));//表名
	}

	/**
	 * 创建一个表
	 * 
	 * @throws Exception
	 */
	@Test
	public void createTable() throws Exception {
		// 创建表管理类
		HBaseAdmin admin = new HBaseAdmin(config); // hbase表管理
		// 创建表描述类
		TableName tableName = TableName.valueOf("test4"); // 表名称
		HTableDescriptor desc = new HTableDescriptor(tableName);
		// 创建列族的描述类
		HColumnDescriptor family = new HColumnDescriptor("info"); // 列族
		// 将列族添加到表中
		desc.addFamily(family);
		HColumnDescriptor family2 = new HColumnDescriptor("info2"); // 列族
		// 将列族添加到表中
		desc.addFamily(family2);
		// 创建表
		admin.createTable(desc); // 创建表
	}
    
	/**
	 * 删除一个表
	 * 
	 * @throws Exception
	 */
	@Test
	@SuppressWarnings("deprecation")
	public void deleteTable() throws MasterNotRunningException,
			ZooKeeperConnectionException, Exception {
		HBaseAdmin admin = new HBaseAdmin(config);
		admin.disableTable("test4");
		admin.deleteTable("test4");
		admin.close();
	}

	/**
	 * 向hbase中增加数据
	 * 
	 * @throws Exception
	 */
	@SuppressWarnings({ "deprecation", "resource" })
	@Test
	public void insertData() throws Exception {
		
		Put put = new Put("wangsanfeng_1234".getBytes());
		put.add("info".getBytes(),"name".getBytes(),"zhangsan".getBytes());
		put.add("info".getBytes(),"age".getBytes(),"23".getBytes());
		put.add("info".getBytes(),"sex".getBytes(),"nan".getBytes());
		put.add("info".getBytes(),"address".getBytes(),"safa".getBytes());
	
		table.put(put);
		
		}
}

四、HBase批量写入数据

	/**
	 * 配置类
	 */
	public class ServerConfigs {
	
		public static final String ZK = "10.68.24.161:2181";//zookeper配置
	    
	    public static final String TOPIC = "ad_upload_event";
	 
	    //public static final String BROKER_LIST = "172.17.245.23:9092";//kalfa配置
	 
	    public static final String GROUP_ID = "test_group";
	 
	    public static final int CACHE_LIST_SIZE = 100; //批量提交数据条数
	}
	
	/**
      * 工具类
      */
	public class HBaseUtils {
	    
		ThreadLocal<List<Put>> threadLocal = new ThreadLocal<List<Put>>();
	    HBaseAdmin admin = null;
	    Connection conn = null;
	    
	    /**
	     * 根据表名获取到HTable实例
	     */
	    public HTable getTable(String tableName) {
	 
	        HTable table = null;
	        try {
	            // table = new HTable(configuration, tableName);
	            final TableName tname = TableName.valueOf(tableName);
	            table = (HTable) conn.getTable(tname);
	 
	        } catch (IOException e) {
	            e.printStackTrace();
	        }
	 
	        return table;
	    }
	    
	    /**
	     * 添加单条记录到HBase表
	     *
	     * @param tableName HBase表名
	     * @param rowkey    HBase表的rowkey
	     * @param cf        HBase表的columnfamily
	     * @param column    HBase表的列key
	     * @param value     写入HBase表的值value
	     */
	    public void put(String tableName, String rowkey, String cf, String column, String value) {
	 
	        HTable table = getTable(tableName);
	        Put put = new Put(Bytes.toBytes(rowkey));
	        put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
	        try {
	            table.put(put);
	        } catch (IOException e) {
	            e.printStackTrace();
	        }
	    }
	    
	    
	    /**
	     * 批量添加记录到HBase表,同一线程要保证对相同表进行添加操作!
	     *
	     * @param tableName HBase表名
	     * @param rowkey    HBase表的rowkey
	     * @param cf        HBase表的columnfamily
	     * @param column    HBase表的列key
	     * @param value     写入HBase表的值value
	     */
	    public void bulkput(String tableName, String rowkey, String cf, String column, String value) {
	        try {
	            List<Put> list = threadLocal.get();
	            if (list == null) {
	                list = new ArrayList<Put>();
	            }
	            Put put = new Put(Bytes.toBytes(rowkey));
	            put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
	            list.add(put);
	            if (list.size() >= ServerConfigs.CACHE_LIST_SIZE) {
	                HTable table = getTable(tableName);
	                table.put(list);
	                list.clear();
	            } else {
	                threadLocal.set(list);
	            }
	       //  table.flushCommits();
	        } catch (IOException e) {
	            e.printStackTrace();
	        }
	    }
	 
	
	    private static HBaseUtils instance = null;
	    
	    public static synchronized HBaseUtils getInstance() {
	        if (null == instance) {
	            instance = new HBaseUtils();
	        }
	        return instance;
	    }
	    
	    
	    private HBaseUtils() {
	        Configuration configuration = new Configuration();
	        configuration.set("hbase.zookeeper.quorum", ServerConfigs.ZK);
	        configuration.set("hbase.rootdir", "hdfs://10.68.24.161:8020/hbase");
	 
	        try {
	            conn = ConnectionFactory.createConnection(configuration);
	            admin = new HBaseAdmin(configuration);
	        } catch (IOException e) {
	            e.printStackTrace();
	        }
	    }
	
	}
	
    /**
      * 双线程,插入20W条数据,用了20s
      */
    public class Test {
		  
       public static void main(String[] args) {
 
	       //HTable table = HBaseUtils.getInstance().getTable("imooc_course_clickcount");
	       //System.out.println(table.getName().getNameAsString());
	       long start = System.currentTimeMillis();
	
	       String tableName = "test4";
	       // String rowkey = "1";
	       for (int i = 0; i < 100000; i++) {
	           HBaseUtils.getInstance().bulkput(tableName, i + "", "info", "name", String.valueOf(100321 + i));
	       }
	       new Thread(new Runnable() {
	           public void run() {
	               for (int i = 100000; i < 200000; i++) {
	                   HBaseUtils.getInstance().bulkput("test4", i + "", "info", "name", String.valueOf(100321 + i));
	               }
	           }
	       }).start();
	 
	       System.out.println(System.currentTimeMillis() - start);
	   }
	 
}

www.htsjk.Com true http://www.htsjk.com/hbase/36357.html NewsArticle HBase批量写入数据, HBase是一个高可靠性、高性能、列式分布式数据库,HBase基于HDFS来存储并处理大型的数据,没有update,但可以追加合并数据;可以有多个主节点 Hmaster, 可以有多个从...
相关文章
    暂无相关文章
评论暂时关闭