Hbase工具类,
package bj.sinorail.utils
import java.io.IOException
import java.util
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, NamespaceDescriptor, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
object HbaseUtils {
/集群环境下的配置参数
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(“hbase.zookeeper.quorum”, MonitorConfiguration.ZOOKEEPER_HOST)
hbaseConf.set(“hbase.zookeeper.property.clientPort”, MonitorConfiguration.ZOOKEEPER_PORT)
hbaseConf.set(“zookeeper.znode.parent”, MonitorConfiguration.HBASE_PATH)
hbaseConf.set(“hbase.rpc.timeout”, “600000”)
hbaseConf.set(“hbase.client.scanner.timeout.period”, “600000”)/
//得到连接conn
def getConnection: Connection = {
val hbaseConf = HBaseConfiguration.create()
//hbaseConf.set(“hbase.zookeeper.quorum”, “hadoop31,hadoop32,hadoop33”)
hbaseConf.set(“hbase.zookeeper.quorum”, “hadoop4:2181,hadoop5:2181,hadoop6:2181,hadoop7:2181,hadoop8:2181”)
hbaseConf.set(“hbase.zookeeper.property.clientPort”, “2181”)
hbaseConf.set(“zookeeper.znode.parent”, “/hbase”)
hbaseConf.set(“hbase.rpc.timeout”, “600000”)
hbaseConf.set(“hbase.client.scanner.timeout.period”, “600000”)
val conn: Connection = ConnectionFactory.createConnection(hbaseConf)
conn
}
def getConf(): Configuration ={
val hbaseConf: Configuration = HBaseConfiguration.create()
//hbaseConf.set(“hbase.zookeeper.quorum”, “hadoop31,hadoop32,hadoop33”)
hbaseConf.set(“hbase.zookeeper.quorum”, “hadoop4:2181,hadoop5:2181,hadoop6:2181,hadoop7:2181,hadoop8:2181”)
hbaseConf.set(“hbase.zookeeper.property.clientPort”, “2181”)
hbaseConf.set(“zookeeper.znode.parent”, “/hbase”)
hbaseConf.set(“hbase.rpc.timeout”, “600000”)
hbaseConf.set(“hbase.client.scanner.timeout.period”, “600000”)
hbaseConf
}
//********************************************判断区
/ **************** 判断表是否存在
* 判断表是否存在
*
* @param tableName 表名
* @return 是否建立成功
*/
def isTableExist(conn: Connection,tableName: TableName): Boolean = {
val admin = conn.getAdmin
try {
if (admin.tableExists(tableName)) true else false
} catch {
case ex: IOException =>
ex.printStackTrace()
false
} finally {
try {
admin.close()
} catch {
case ex: IOException => ex.printStackTrace()
}
}
}
//判断表存在的重载
def isTableExist(conn: Connection,tablename: String): Boolean = {
val admin = conn.getAdmin
val tableName = TableName.valueOf(tablename)
try {
if (admin.tableExists(tableName)) true else false
} catch {
case ex: IOException =>
ex.printStackTrace()
false
} finally {
try {
admin.close()
} catch {
case ex: IOException => ex.printStackTrace()
}
}
}
//*********************************************创建区
/ *********** 创建表
* 创建一张表, 当表已经存在的时候,抛出TableExistsException
*
* @param tableName 表名
* @param columnFamilies 列簇名
*/
def createTable(conn: Connection,tableName: String, columnFamilies: Array[String]) = {
val admin = conn.getAdmin
val table = TableName.valueOf(tableName)
val tableDesc = new HTableDescriptor(table)
for (columnFamily <- columnFamilies) {
tableDesc.addFamily(new HColumnDescriptor(columnFamily.getBytes))
}
try {
admin.createTable(tableDesc)
} catch {
case ex: IOException => {
ex.printStackTrace()
// LOG.debug(ex,ex.fillInStackTrace())
}
} finally {
try {
admin.close()
} catch {
case ex: IOException => ex.printStackTrace()
}
}
}
/**
* ********** 创建命名空间
*
* @param namespace
*/
def createNamespace(conn: Connection,namespace: String) = {
val admin = conn.getAdmin
try {
val nameSpace = NamespaceDescriptor.create(namespace).build()
admin.createNamespace(nameSpace)
} catch {
case ex: IOException => ex.printStackTrace()
} finally {
try {
admin.close()
} catch {
case ex: IOException => ex.printStackTrace()
}
}
}
//**********************************************删除区
/**
* 删除命令空间
*
* @param namespace
*/
def dropNamespace(conn: Connection,namespace: String) = {
val admin = conn.getAdmin
try {
admin.deleteNamespace(namespace)
} catch {
case ex: IOException => ex.printStackTrace()
} finally {
try {
admin.close()
} catch {
case ex: Exception => ex.printStackTrace()
}
}
}
/** ************* 删除一张表
* 删除一张表, 删除的表不存在的时候会抛出TableNotFoundException
*
* @param tableName 表名
*/
def dropTable(conn: Connection,tableName: String) = {
val admin = conn.getAdmin
val table = TableName.valueOf(tableName)
try {
admin.disableTable(table)
admin.deleteTable(table)
} catch {
case ex: IOException => {
// LOG.debug(ex,ex.fillInStackTrace())
ex.printStackTrace()
}
} finally {
try {
admin.close()
} catch {
case ex: Exception => ex.printStackTrace()
}
}
}
/**
* **************** 删除 一条/一列/一列簇 的数据
*
* @param tableName 表名
* @param rowkey 行键
* @param limit 可选参数, [列簇,列] 如果不加最后一个参数,将按 rowkey 删除
/
def deleteData(conn: Connection,tableName: String, rowkey: String, limit: String) = {
val table = TableName.valueOf(tableName)
var currentTable: Table = null
try {
currentTable = conn.getTable(table)
val del = new Delete(Bytes.toBytes(rowkey))
if (limit.length > 2) {
throw new IllegalArgumentException(“number of argument is not right”)
} else if (limit.length == 2) {
del.addColumn(Bytes.toBytes(limit(0)), Bytes.toBytes(limit(1)))
} else if (limit.length == 1) {
del.addFamily(Bytes.toBytes(limit(0)))
}
currentTable.delete(del)
} catch {
case ex: Exception => {
// LOG.debug(ex,ex.fillInStackTrace())
ex.printStackTrace()
}
} finally {
try {
currentTable.close()
} catch {
case ex: Exception => ex.printStackTrace()
}
}
}
//重载
def deleteData(conn: Connection,tableName: String, rowkey: Int, limit: String*) = {
val table = TableName.valueOf(tableName)
var currentTable: Table = null
try {
currentTable = conn.getTable(table)
val del = new Delete(Bytes.toBytes(rowkey))
if (limit.length > 2) {
throw new IllegalArgumentException(“number of argument is not right”)
} else if (limit.length == 2) {
del.addColumn(Bytes.toBytes(limit(0)), Bytes.toBytes(limit(1)))
} else if (limit.length == 1) {
del.addFamily(Bytes.toBytes(limit(0)))
}
currentTable.delete(del)
} catch {
case ex: Exception => {
// LOG.debug(ex,ex.fillInStackTrace())
ex.printStackTrace()
}
} finally {
try {
currentTable.close()
} catch {
case ex: Exception => ex.printStackTrace()
}
}
}
/**
* ************* 基于rowkey范围删除数据
*
* @param table
* @param startRowKey
* @param endRowKey
*/
def batchDeleteData(conn: Connection,table: String, startRowKey: String, endRowKey: String) = {
val scan = new Scan
scan.setStartRow(Bytes.toBytes(startRowKey))
scan.setStopRow(Bytes.toBytes(endRowKey))
val list = new util.ArrayListDelete
var scanner: ResultScanner = null
var currentTable: Table = null
try {
currentTable = conn.getTable(TableName.valueOf(table))
scanner = currentTable.getScanner(scan)
val scIt = scanner.iterator()
while (scIt.hasNext) {
val del = new Delete(scIt.next.getRow)
list.add(del)
if (list.size() >= 4096) {
currentTable.delete(list)
list.clear()
}
}
currentTable.delete(list)
} catch {
case ex: IOException => ex.printStackTrace()
} finally {
try {
scanner.close()
currentTable.close()
} catch {
case ex: IOException => ex.printStackTrace()
}
}
}
//********************************************查询区
/ ************* 得到一行完整数据
* get 一条行键 = rowkey 的数据
*
* @param tableName
* @param rowkey
* @return
*/
def getOneRowData(conn: Connection,tableName: String, rowkey: String): Result = {
val table = TableName.valueOf(tableName)
var result: Result = null
var currentTable: Table = null
try {
currentTable = conn.getTable(table)
val row = new Get(Bytes.toBytes(rowkey))
result = currentTable.get(row)
result
} catch {
case ex: Exception => {
// LOG.debug(ex,ex.fillInStackTrace())
ex.printStackTrace()
result
}
} finally {
try {
currentTable.close()
} catch {
case ex: IOException => ex.printStackTrace()
}
}
}
/**
* ****************** 查询一定范围的rowkey
*
* @param tablename
* @param startrow
* @param stoprow
* @return
*/
def scanRangeRow(conn: Connection,tablename: String, startrow: String, stoprow: String): ResultScanner = {
var scanner: ResultScanner = null
try {
val table = conn.getTable(TableName.valueOf(tablename))
val scan = new Scan()
scan.setStartRow(Bytes.toBytes(startrow))
scan.setStopRow(Bytes.toBytes(stoprow))
val scanner = table.getScanner(scan)
scanner
} catch {
case ex: IOException => ex.printStackTrace()
scanner
}
}
/**
* **************** 查询一定范围rowkey,并添加过滤条件
*
* @param tablename
* @param scan
* @return
*/
def scanRangeRow(conn: Connection,tablename: String, scan: Scan): ResultScanner = {
var scanner: ResultScanner = null
try {
val table = conn.getTable(TableName.valueOf(tablename))
scanner = table.getScanner(scan)
scanner
} catch {
case ex: IOException => ex.printStackTrace()
scanner
}
}
/************************ 使用spark读取hbase的数据
*
* 传入的rdd所使用的sparkconf需要使用KryoSerializer序列化
* 可选项:sparkConf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
*/
def sparkFromHbase(hbaseConf:Configuration,tablename:String,sc:SparkContext): RDD[Result] = {
hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename)
//读取数据并转化成rdd
var rdd:RDD[Result]=null
try {
val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
rdd =hBaseRDD.map(x=>x._2)
rdd
} catch {
case ex:Exception =>ex.printStackTrace()
rdd
}
}
//*************************************************插入区
/** *********** 向某个表插入单条数据
*
* @param tableName 表名
* @param rowkey 行键
* @param cf 列簇
* @param qualifier 列
* @return
*/
def insertData(conn: Connection,tableName: String, rowkey: String, cf: String, qualifier: String, value: String) = {
val table = TableName.valueOf(tableName)
var currentTable: Table = null
try {
currentTable = conn.getTable(table)
val put = new Put(Bytes.toBytes(rowkey))
put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(qualifier), Bytes.toBytes(value))
currentTable.put(put)
} catch {
case ex: Exception => {
// LOG.debug(ex,ex.fillInStackTrace())
ex.printStackTrace()
}
} finally {
try {
currentTable.close()
} catch {
case ex: IOException => ex.printStackTrace()
}
}
}
/**
* ********** 向某个表中批量插入数据
*
* @param tableName 表名
* @param puts 要插入的Puts
*/
def putBatchData(conn: Connection,tableName: String, puts: util.List[Put]) = {
val table = TableName.valueOf(tableName)
var currentTable: Table = null
try {
currentTable = conn.getTable(table)
currentTable.put(puts)
} catch {
case ex: IOException => ex.printStackTrace()
} finally {
try {
currentTable.close()
} catch {
case ex: IOException => ex.printStackTrace()
}
}
}
/**
* ******************** 用spark向hbase写入数据
*
* @param tablename
* @param rdd :RDD[Put]
/
def sparkToHbasemapreduce(hbaseConf:Configuration,tablename: String, rdd: RDD[Put]) = {
/*
* 传入的rdd所使用的sparkconf需要使用KryoSerializer序列化
* 必选项:sparkconf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
* 可选项:sparkconf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result]))
*/
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
var job: Job=null
try {
job= Job.getInstance(hbaseConf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
rdd.map(x => (new ImmutableBytesWritable, x))
.saveAsNewAPIHadoopDataset(job.getConfiguration)
} catch {
case ex:Exception =>ex.printStackTrace()
}
}
/**
* ******************** 用spark向hbase写入数据
* 使用旧的mapred
*
* @param tablename
* @param rdd :RDD[Put]
*/
def sparkToHbaseMapred(hbaseConf:Configuration,tablename: String, rdd: RDD[Put]) = {
import org.apache.hadoop.hbase.mapred.TableOutputFormat
try {
val jobConf: JobConf = new JobConf(hbaseConf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
rdd.map(x => (new ImmutableBytesWritable, x))
.saveAsHadoopDataset(jobConf)
} catch {
case ex:Exception =>ex.printStackTrace()
}
}
/**
* *************************流操作及其它操作
*/
}