HBase批量写入数据,
HBase是一个高可靠性、高性能、列式分布式数据库,HBase基于HDFS来存储并处理大型的数据,没有update,但可以追加合并数据;可以有多个主节点 Hmaster, 可以有多个从节点 Hregionserver, 是N+N关系;特点是写快读慢。在需要实时读写,随机访问超大规模数据集时,可以使用HBase.
一、HBase与行式数据区别:
1、行式数据库是一行一行的查数据,进行匹配;列式数据库是把行式数据库的列转换为了行,没有多表关联,没有事务,只能存海量数据。
2、HBase一张表可能达到上亿行,上百万列;对于为空(null)的列,不占用存储空间,稀疏性很好;面向列(族)的存储和权限控制,针对列(族)进行独立检索。
二、HBase 与HDFS关系
当一条一条数据插入时,HDFS的namenode会吃力,所以先缓存到HBASE(Hmaster/Hregionserver 主从)的Hregionserver内存, 当达到128M,再写到hdfs。
HDFS最适于执行批次分析。然而,它最大的缺点是无法执行实时分析,而实时分析是信息科技行业的标配。HBase它不适于批次分析,但能够处理大规模数据,可以向Hadoop实时地调用数据。
三、HBase单条写入数据
public class HbaseTest {
/**
* 配置ss
*/
static Configuration config = null;
private Connection connection = null;
private Table table = null;
@Before
public void init() throws Exception {
config = HBaseConfiguration.create();// 配置
config.set("hbase.zookeeper.quorum", "master1-vsp.com");// zookeeper地址
config.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口
connection = ConnectionFactory.createConnection(config);
table = connection.getTable(TableName.valueOf("test4"));//表名
}
/**
* 创建一个表
*
* @throws Exception
*/
@Test
public void createTable() throws Exception {
// 创建表管理类
HBaseAdmin admin = new HBaseAdmin(config); // hbase表管理
// 创建表描述类
TableName tableName = TableName.valueOf("test4"); // 表名称
HTableDescriptor desc = new HTableDescriptor(tableName);
// 创建列族的描述类
HColumnDescriptor family = new HColumnDescriptor("info"); // 列族
// 将列族添加到表中
desc.addFamily(family);
HColumnDescriptor family2 = new HColumnDescriptor("info2"); // 列族
// 将列族添加到表中
desc.addFamily(family2);
// 创建表
admin.createTable(desc); // 创建表
}
/**
* 删除一个表
*
* @throws Exception
*/
@Test
@SuppressWarnings("deprecation")
public void deleteTable() throws MasterNotRunningException,
ZooKeeperConnectionException, Exception {
HBaseAdmin admin = new HBaseAdmin(config);
admin.disableTable("test4");
admin.deleteTable("test4");
admin.close();
}
/**
* 向hbase中增加数据
*
* @throws Exception
*/
@SuppressWarnings({ "deprecation", "resource" })
@Test
public void insertData() throws Exception {
Put put = new Put("wangsanfeng_1234".getBytes());
put.add("info".getBytes(),"name".getBytes(),"zhangsan".getBytes());
put.add("info".getBytes(),"age".getBytes(),"23".getBytes());
put.add("info".getBytes(),"sex".getBytes(),"nan".getBytes());
put.add("info".getBytes(),"address".getBytes(),"safa".getBytes());
table.put(put);
}
}
四、HBase批量写入数据
/**
* 配置类
*/
public class ServerConfigs {
public static final String ZK = "10.68.24.161:2181";//zookeper配置
public static final String TOPIC = "ad_upload_event";
//public static final String BROKER_LIST = "172.17.245.23:9092";//kalfa配置
public static final String GROUP_ID = "test_group";
public static final int CACHE_LIST_SIZE = 100; //批量提交数据条数
}
/**
* 工具类
*/
public class HBaseUtils {
ThreadLocal<List<Put>> threadLocal = new ThreadLocal<List<Put>>();
HBaseAdmin admin = null;
Connection conn = null;
/**
* 根据表名获取到HTable实例
*/
public HTable getTable(String tableName) {
HTable table = null;
try {
// table = new HTable(configuration, tableName);
final TableName tname = TableName.valueOf(tableName);
table = (HTable) conn.getTable(tname);
} catch (IOException e) {
e.printStackTrace();
}
return table;
}
/**
* 添加单条记录到HBase表
*
* @param tableName HBase表名
* @param rowkey HBase表的rowkey
* @param cf HBase表的columnfamily
* @param column HBase表的列key
* @param value 写入HBase表的值value
*/
public void put(String tableName, String rowkey, String cf, String column, String value) {
HTable table = getTable(tableName);
Put put = new Put(Bytes.toBytes(rowkey));
put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
try {
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 批量添加记录到HBase表,同一线程要保证对相同表进行添加操作!
*
* @param tableName HBase表名
* @param rowkey HBase表的rowkey
* @param cf HBase表的columnfamily
* @param column HBase表的列key
* @param value 写入HBase表的值value
*/
public void bulkput(String tableName, String rowkey, String cf, String column, String value) {
try {
List<Put> list = threadLocal.get();
if (list == null) {
list = new ArrayList<Put>();
}
Put put = new Put(Bytes.toBytes(rowkey));
put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
list.add(put);
if (list.size() >= ServerConfigs.CACHE_LIST_SIZE) {
HTable table = getTable(tableName);
table.put(list);
list.clear();
} else {
threadLocal.set(list);
}
// table.flushCommits();
} catch (IOException e) {
e.printStackTrace();
}
}
private static HBaseUtils instance = null;
public static synchronized HBaseUtils getInstance() {
if (null == instance) {
instance = new HBaseUtils();
}
return instance;
}
private HBaseUtils() {
Configuration configuration = new Configuration();
configuration.set("hbase.zookeeper.quorum", ServerConfigs.ZK);
configuration.set("hbase.rootdir", "hdfs://10.68.24.161:8020/hbase");
try {
conn = ConnectionFactory.createConnection(configuration);
admin = new HBaseAdmin(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}
}
/**
* 双线程,插入20W条数据,用了20s
*/
public class Test {
public static void main(String[] args) {
//HTable table = HBaseUtils.getInstance().getTable("imooc_course_clickcount");
//System.out.println(table.getName().getNameAsString());
long start = System.currentTimeMillis();
String tableName = "test4";
// String rowkey = "1";
for (int i = 0; i < 100000; i++) {
HBaseUtils.getInstance().bulkput(tableName, i + "", "info", "name", String.valueOf(100321 + i));
}
new Thread(new Runnable() {
public void run() {
for (int i = 100000; i < 200000; i++) {
HBaseUtils.getInstance().bulkput("test4", i + "", "info", "name", String.valueOf(100321 + i));
}
}
}).start();
System.out.println(System.currentTimeMillis() - start);
}
}
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。