hbase,
一.hbase架构
1.1、HMaster
功能:
1.2、RegionServer
功能:
组件:
1.3、HBase 的架构
HBase 内置有 Zookeeper,但一般我们会有其他的 Zookeeper 集群来监管 master 和regionserver,Zookeeper 通过选举,保证任何时候,集群中只有一个活跃的 HMaster,HMaster与 HRegionServer 启动时会向 ZooKeeper 注册,存储所有 HRegion 的寻址入口,实时监控HRegionserver 的上线和下线信息。并实时通知给 HMaster,存储 HBase 的 schema 和 table元数据,默认情况下,Base 管理 ZooKeeper 实例,Zookeeper 的引入使得 HMaster 不再是单点故障。一般情况下会启动两个 HMaster,非 Active 的 HMaster 会定期的和 Active HMaster
通信以获取其最新状态,从而保证它是实时更新的,因而如果启动了多个 Master 反而增加了 Active HMaster 的负担。一个 RegionServer 可以包含多个 egion,每个 RegionServer 维护一个 HLog,和多个 HFiles以及其对应的 MemStore。RegionServer 运行于 DataNode 上,数量可以与 DatNode 数量一致,
请参考如下架构图:
3hbase物理存储结构
二、表的操作
hbase(main)> create 'student','info'
- 插入数据到表
hbase(main) > put 'student','1001','info:name','Thomas'
hbase(main) > put 'student','1001','info:sex','male'
hbase(main) > put 'student','1001','info:age','18'
hbase(main) > put 'student','1002','info:name','Janna'
hbase(main) > put 'student','1002','info:sex','female'
hbase(main) > put 'student','1002','info:age','20'
- 扫描查看表数据
hbase(main) > scan 'student'
hbase(main) > scan 'student',{STARTROW => '1001', STOPROW => '1001'}
hbase(main) > scan 'student',{STARTROW => '1001'}
- 查看表结构
hbase(main):012:0> describe ‘student’
- 更新指定字段的数据
hbase(main) > put 'student','1001','info:name','Nick'
hbase(main) > put 'student','1001','info:age','100'
- 查看“指定行”或“指定列族:列”的数据
hbase(main) > get 'student','1001'
hbase(main) > get 'student','1001','info:name'
- 删除数据
删除某 rowkey 的全部数据:
hbase(main) > deleteall 'student','1001'
删除某 rowkey 的某一列数据:
hbase(main) > delete 'student','1002','info:sex'
- 清空表数据
hbase(main) > truncate 'student'
尖叫提示:清空表的操作顺序为先 disable,然后再 truncating。
9) 删除表
首先需要先让该表为 disable 状态:
hbase(main) > disable 'student'
然后才能 drop 这个表:
hbase(main) > drop 'student'
尖叫提示:如果直接 drop 表,会报错:Drop the named table. Table must first be disabled
ERROR: Table student is enabled. Disable it first.
10) 统计表数据行数
hbase(main) > count 'student'
- 变更表信息
将 info 列族中的数据存放 3 个版本:
hbase(main) > alter 'student',{NAME=>'info',VERSIONS=>3}
三.读写过程
1、读写流程
四,api操作
1新建项目后在 pom.xml 中添加依赖:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.0</version>
</dependency>
2操作
package hbase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class HbaseApi {
public static Configuration conf;
static {
//使用 HBaseConfiguration 的单例方法实例化
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "master01");
conf.set("hbase.zookeeper.property.clientPort", "2181");
}
/**
* 创建表空间
*
* @param namespaceNameStr
* @throws IOException
*/
public static void createNamespace(String namespaceNameStr) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
NamespaceDescriptor.Builder builder = NamespaceDescriptor.create(namespaceNameStr);
NamespaceDescriptor nd = builder.build();
admin.createNamespace(nd);
System.out.println("创建命" + namespaceNameStr + "名空间成功");
}
/**
* 删除命名空间
*
* @param namespaceNameStr
* @throws IOException
*/
public static void deleteNamespace(String namespaceNameStr) throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
admin.deleteNamespace(namespaceNameStr);
System.out.println("删除命名空间" + namespaceNameStr + "成功");
}
/**
* 查看所有命名空间
*
* @param namespaceNameStr
* @throws IOException
*/
public static void scanAllNamespace() throws IOException {
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
NamespaceDescriptor[] list = admin.listNamespaceDescriptors();
for (NamespaceDescriptor name : list) {
System.out.println("命名空间:" + name);
}
}
/**
* 判读表是否存在(新的api)
*
* @param tableName
* @return true or false
*/
public static boolean isTableExist(String tableName) throws IOException {
//在 HBase 中管理、访问表需要先创建 HBaseAdmin 对象
Connection connection = ConnectionFactory.createConnection(conf);
Admin admin = connection.getAdmin();
return admin.tableExists(TableName.valueOf(tableName));
}
/**
* 注意老版本的api 判断表是否存在
* 老的api兼容性好(一定范围),
*
* @param tableName
* @return
* @throws IOException
*/
public static boolean isTableExistOld(String tableName) throws IOException {
//在 HBase 中管理、访问表需要先创建 HBaseAdmin 对象
//注意这是老版本的api
HBaseAdmin admin = new HBaseAdmin(conf);
return admin.tableExists(tableName);
}
/**
* 创建一张表
*
* @param tableName
* @param columnFamily
* @throws IOException
*/
public static void createTable(String tableName, String... columnFamily) throws IOException {
//管理表用的是admin ,管理表的数据getTable
HBaseAdmin admin = new HBaseAdmin(conf);
//判断表是否存在
if (isTableExist(tableName)) {
System.out.println("表" + tableName + "已存在");
//System.exit(0);
} else {
//创建表属性对象,表名需要转字节
//HTableDescriptor 表的描述器
//创建表需要的是列族,不需要表
HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
//创建多个列族
for (String cf : columnFamily) {
descriptor.addFamily(new HColumnDescriptor(cf));
}
//根据对表的配置,创建表
admin.createTable(descriptor);
System.out.println("表" + tableName + "创建成功!");
}
}
/**
* 删除表
*
* @param tableName
* @throws IOException
*/
public static void dropTable(String tableName) throws IOException {
HBaseAdmin admin = new HBaseAdmin(conf);
if (isTableExist(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
System.out.println("表" + tableName + "删除成功!");
} else {
System.out.println("表" + tableName + "不存在!");
}
}
/**
* 进行单条数据插入
*
* @param tableName
* @param rowKey
* @param columnFamily
* @param column
* @param value
* @throws IOException
*/
public static void addRowData(String tableName, String rowKey, String columnFamily, String column, String value) throws
IOException {
//创建 HTable 对象
HTable hTable = new HTable(conf, tableName); //老的api
/* Connection connection = ConnectionFactory.createConnection(conf);//新的api
Table table = connection.getTable(TableName.valueOf(tableName));*/
//向表中插入数据
Put put = new Put(Bytes.toBytes(rowKey)); //要求是字节数组,转换为(Bytes.toBytes)
//向 Put 对象中组装数据
//put.add() 封装到一个新的单元格
//用于添加多条数据
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
/* //单条数据,已经过期
put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));*/
hTable.put(put);
// hTable.put(Put); put对象
// hTable.put(); list集合
hTable.close();
System.out.println("插入数据成功");
}
/**
* 删除多行数据
*
* @param tableName
* @param rows(根据业务可以将方法重载)
* @throws IOException
*/
public static void deleteMultiRow(String tableName, String... rows) throws IOException {
HTable hTable = new HTable(conf, tableName);
List<Delete> deleteList = new ArrayList<Delete>();
for (String row : rows) {
Delete delete = new Delete(Bytes.toBytes(row));
deleteList.add(delete);
}
hTable.delete(deleteList);
hTable.close();
System.out.println("删除数据成功");
}
/**
* 获得所有行的数据
*
* @param tableName
* @throws IOException
*/
public static void getAllRows(String tableName) throws IOException {
HTable hTable = new HTable(conf, tableName);
//得到用于扫描 region 的对象
Scan scan = new Scan();
// scan.setMaxVersions(); 扫描最大的版本号
//使用 HTable 得到 resultcanner 实现类的对象
ResultScanner resultScanner = hTable.getScanner(scan);
for (Result result : resultScanner) {
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
//得到 rowkey
System.out.println("行键:" + Bytes.toString(CellUtil.cloneRow(cell)));
//得到列族
System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
}
/**
* 获取rowKey下某一行的数据
*
* @param tableName
* @param rowKey
* @throws IOException
*/
public static void getRow(String tableName, String rowKey) throws IOException {
HTable table = new HTable(conf, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
//get.setMaxVersions();显示所有版本
//get.setTimeStamp();显示指定时间戳的版本
Result result = table.get(get);
for (Cell cell : result.rawCells()) {
System.out.println("行键:" + Bytes.toString(result.getRow()));
System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
System.out.println("时间戳:" + cell.getTimestamp());
}
}
/**
* 获取某一行指定“列族:列”的数据
*
* @param tableName
* @param rowKey
* @param family
* @param qualifier
* @throws IOException
*/
public static void getRowQualifier(String tableName, String rowKey, String family, String
qualifier) throws IOException {
HTable table = new HTable(conf, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
Result result = table.get(get);
for (Cell cell : result.rawCells()) {
System.out.println("行键:" + Bytes.toString(result.getRow()));
System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
}
}
public static void main(String[] args) throws IOException {
/* scanAllNamespace();
createNamespace("aa");
scanAllNamespace();
deleteNamespace("aa");*/
//1测试isTableExist
// System.out.println(isTableExist("TEST_HBASE_PHOENIX"));
// createTable("staffe", "cf1", "cf2");
// dropTable("staffe");
// System.out.println(isTableExist("staffe"));
// addRowData("staffe", "1001", "cf1", "name", "nick");
// addRowData("staffe", "1002", "cf1", "name", "1002");
// addRowData("staffe", "1003", "cf1", "name", "1003");
// deleteMultiRow("staffe", "cf1", "1001", "1002");
// getAllRows("staffe");
// getRow("staffe", "1001");
// getRowQualifier("staffe", "1001", "cf1", "name");
}
}
package hbase
import java.io.IOException
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{ConnectionFactory, Delete, HBaseAdmin, HTable, Put, ResultScanner, Scan, Table}
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.mutable.ListBuffer
object HbaseScalaUtilsApi {
//创建hbase的链接信息
val hBaseconf = HBaseConfiguration.create
hBaseconf.set("hbase.zookeeper.property.clientPort", "2181")
hBaseconf.set("hbase.zookeeper.quorum", "master01,slave01,slave02")
hBaseconf.set("hbase.master", "master01")
hBaseconf.set("zookeeper.znode.parent", "/hbase")
val connection = ConnectionFactory.createConnection(hBaseconf)
val hAdmin = connection.getAdmin
/**
* 判断表是否存在
*
* @param tableName
* @return true or false
*/
def isTableExist(tableName: String): Boolean = {
var result = false
val tName = TableName.valueOf(tableName)
if (hAdmin.tableExists(tName)) {
result = true
println("表已经存在")
} else {
println("表不存在")
}
result
}
/**
* 创建表
*
* @param tableName
* @param columnFamilys
*/
def createTable(tableName: String, columnFamilys: Array[String]) = {
//操作的表名
val tName = TableName.valueOf(tableName)
//当表不存在的时候创建Hbase表
if (!hAdmin.tableExists(tName)) {
//创建Hbase表模式
val descriptor = new HTableDescriptor(tName)
//创建列簇i 此处是通过数组来封装多个列族
for (columnFamily <- columnFamilys) {
descriptor.addFamily(new HColumnDescriptor(columnFamily))
}
//创建表
hAdmin.createTable(descriptor)
println("create successful!!")
} else {
println("create fail!!")
}
}
/**
* 删除一张表
*
* @param tableName
*/
def dropTable(tableName: String): Boolean = {
var status = false
val tName = TableName.valueOf(tableName)
if (hAdmin.tableExists(tName)) {
hAdmin.disableTable(tName)
hAdmin.deleteTable(tName)
println("drop successful!!")
status = true
} else {
println("drop fail!!")
}
false
}
/**
* put一条数据
*
* @param tableName
* @param rowkey
* @param columnFamily
* @param column
* @param value
*/
def addRowData(tableName: String, rowkey: String, columnFamily: String, column: String, value: String) = {
val table = connection.getTable(TableName.valueOf(tableName))
//准备key 的数据
val puts = new Put(rowkey.getBytes())
//添加列簇名,字段名,字段值value
puts.addColumn(columnFamily.getBytes(), column.getBytes(), value.getBytes())
//把数据插入到tbale中
table.put(puts)
println("put successful!!")
}
/**
* 查看表中某一列族下列的所有数据
* scan( "staffe" "cf1" "name")
*
* @param tableName
* @param columnFamily
* @param column
* @return
*/
def getAllColumnRows(tableName: String, columnFamily: String, column: String) = {
val table = connection.getTable(TableName.valueOf(tableName))
//定义scan对象
val scan = new Scan()
//添加列簇名称
scan.addFamily(columnFamily.getBytes())
//从table中抓取数据来scan
val scanner = table.getScanner(scan)
var result = scanner.next()
//数据不为空时输出数据
while (result != null) {
println(s"rowkey:${Bytes.toString(result.getRow)},列簇:${columnFamily}:${column},value:${Bytes.toString(result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(column)))}")
result = scanner.next()
}
//通过scan取完数据后,记得要关闭ResultScanner,否则RegionServer可能会出现问题(对应的Server资源无法释放)
scanner.close()
}
/**
* 获得表所有数据
*
* @param tableName
* @return
*/
def getAllRows(tableName: String): ListBuffer[String] = {
var table: Table = null
val list = new ListBuffer[String]
try {
table = connection.getTable(TableName.valueOf(tableName))
val results: ResultScanner = table.getScanner(new Scan)
import scala.collection.JavaConversions._
for (result <- results) {
for (cell <- result.rawCells) {
val row: String = Bytes.toString(cell.getRowArray, cell.getRowOffset, cell.getRowLength)
val family: String = Bytes.toString(cell.getFamilyArray, cell.getFamilyOffset, cell.getFamilyLength)
val colName: String = Bytes.toString(cell.getQualifierArray, cell.getQualifierOffset, cell.getQualifierLength)
val value: String = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
val context: String = "rowkey:" + row + "," + "列族:" + family + "," + "列:" + colName + "," + "值:" + value
list += context
}
}
results.close()
} catch {
case e: IOException =>
e.printStackTrace()
}
list
}
/**
* 删除某列
* delete "staffe" "1001" "cf1" "name"
*
* @param tableName
* @param rowkey
* @param columnFamily
* @param column
*/
def deleteRecord(tableName: String, rowkey: String, columnFamily: String, column: String) = {
val table = connection.getTable(TableName.valueOf(tableName))
val info = new Delete(Bytes.toBytes(rowkey))
info.addColumn(columnFamily.getBytes(), column.getBytes())
table.delete(info)
println("delete successful!!")
}
/**
* 删除多列
*
* @param tableName
* @param rows
*/
def delMultiRows(tableName: String, rows: Array[String]): Unit = {
val table = connection.getTable(TableName.valueOf(tableName))
val deleteList = for (row <- rows) yield new Delete(Bytes.toBytes(row)) //yield 会把当前的元素记下来,保存在集合中
deleteList.foreach(x => {
table.delete(x)
})
println("删除多列成功")
}
// 关闭 connection 连接
def close() = {
if (connection != null) {
try {
connection.close()
println("关闭成功!")
} catch {
case e: IOException => println("关闭失败!")
}
}
}
def main(args: Array[String]): Unit = {
// isTableExist("tes2")
// createTable("staffe", Array("name", "age", "cf1"))
// dropTable("staffe")
addRowData("staffe", "1001", "cf1", "name", "nick")
addRowData("staffe", "1002", "cf1", "name", "1002")
addRowData("staffe", "1003", "cf1", "name", "1003")
getAllColumnRows("staffe", "cf1", "name")
/* val allRows: ListBuffer[String] = getAllRows("staffe")
allRows.foreach(x => {
println(x)
})*/
// deleteRecord("staffe", "1001", "cf1", "name")
// delMultiRows("staffe", Array("1002", "1003"))
close()
}
}