欢迎投稿

今日深度:

Hbase工具类,

Hbase工具类,


package bj.sinorail.utils

import java.io.IOException
import java.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, NamespaceDescriptor, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

object HbaseUtils {
/集群环境下的配置参数
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(“hbase.zookeeper.quorum”, MonitorConfiguration.ZOOKEEPER_HOST)
hbaseConf.set(“hbase.zookeeper.property.clientPort”, MonitorConfiguration.ZOOKEEPER_PORT)
hbaseConf.set(“zookeeper.znode.parent”, MonitorConfiguration.HBASE_PATH)
hbaseConf.set(“hbase.rpc.timeout”, “600000”)
hbaseConf.set(“hbase.client.scanner.timeout.period”, “600000”)
/

//得到连接conn
def getConnection: Connection = {
val hbaseConf = HBaseConfiguration.create()
//hbaseConf.set(“hbase.zookeeper.quorum”, “hadoop31,hadoop32,hadoop33”)
hbaseConf.set(“hbase.zookeeper.quorum”, “hadoop4:2181,hadoop5:2181,hadoop6:2181,hadoop7:2181,hadoop8:2181”)
hbaseConf.set(“hbase.zookeeper.property.clientPort”, “2181”)
hbaseConf.set(“zookeeper.znode.parent”, “/hbase”)
hbaseConf.set(“hbase.rpc.timeout”, “600000”)
hbaseConf.set(“hbase.client.scanner.timeout.period”, “600000”)
val conn: Connection = ConnectionFactory.createConnection(hbaseConf)
conn
}

def getConf(): Configuration ={
val hbaseConf: Configuration = HBaseConfiguration.create()
//hbaseConf.set(“hbase.zookeeper.quorum”, “hadoop31,hadoop32,hadoop33”)
hbaseConf.set(“hbase.zookeeper.quorum”, “hadoop4:2181,hadoop5:2181,hadoop6:2181,hadoop7:2181,hadoop8:2181”)
hbaseConf.set(“hbase.zookeeper.property.clientPort”, “2181”)
hbaseConf.set(“zookeeper.znode.parent”, “/hbase”)
hbaseConf.set(“hbase.rpc.timeout”, “600000”)
hbaseConf.set(“hbase.client.scanner.timeout.period”, “600000”)
hbaseConf
}
//********************************************判断区
/
**************** 判断表是否存在
* 判断表是否存在
*
* @param tableName 表名
* @return 是否建立成功
*/

def isTableExist(conn: Connection,tableName: TableName): Boolean = {
val admin = conn.getAdmin

try {
  if (admin.tableExists(tableName)) true else false
} catch {
  case ex: IOException =>
    ex.printStackTrace()
    false
} finally {
  try {
    admin.close()
  } catch {
    case ex: IOException => ex.printStackTrace()
  }
}

}

//判断表存在的重载
def isTableExist(conn: Connection,tablename: String): Boolean = {
val admin = conn.getAdmin
val tableName = TableName.valueOf(tablename)
try {
if (admin.tableExists(tableName)) true else false
} catch {
case ex: IOException =>
ex.printStackTrace()
false
} finally {
try {
admin.close()
} catch {
case ex: IOException => ex.printStackTrace()
}
}
}

//*********************************************创建区
/
*********** 创建表
* 创建一张表, 当表已经存在的时候,抛出TableExistsException
*
* @param tableName 表名
* @param columnFamilies 列簇名
*/
def createTable(conn: Connection,tableName: String, columnFamilies: Array[String]) = {
val admin = conn.getAdmin
val table = TableName.valueOf(tableName)

val tableDesc = new HTableDescriptor(table)

for (columnFamily <- columnFamilies) {
  tableDesc.addFamily(new HColumnDescriptor(columnFamily.getBytes))
}

try {
  admin.createTable(tableDesc)
} catch {
  case ex: IOException => {
    ex.printStackTrace()
    //        LOG.debug(ex,ex.fillInStackTrace())
  }
} finally {
  try {
    admin.close()
  } catch {
    case ex: IOException => ex.printStackTrace()
  }
}

}

/**
* ********** 创建命名空间
*
* @param namespace
*/

def createNamespace(conn: Connection,namespace: String) = {
val admin = conn.getAdmin
try {
val nameSpace = NamespaceDescriptor.create(namespace).build()
admin.createNamespace(nameSpace)
} catch {
case ex: IOException => ex.printStackTrace()
} finally {
try {
admin.close()
} catch {
case ex: IOException => ex.printStackTrace()
}
}
}

//**********************************************删除区

/**
* 删除命令空间
*
* @param namespace
*/
def dropNamespace(conn: Connection,namespace: String) = {
val admin = conn.getAdmin
try {
admin.deleteNamespace(namespace)
} catch {
case ex: IOException => ex.printStackTrace()
} finally {
try {
admin.close()
} catch {
case ex: Exception => ex.printStackTrace()
}
}

}

/** ************* 删除一张表
* 删除一张表, 删除的表不存在的时候会抛出TableNotFoundException
*
* @param tableName 表名
*/
def dropTable(conn: Connection,tableName: String) = {
val admin = conn.getAdmin
val table = TableName.valueOf(tableName)

try {
  admin.disableTable(table)
  admin.deleteTable(table)
} catch {
  case ex: IOException => {
    //        LOG.debug(ex,ex.fillInStackTrace())
    ex.printStackTrace()
  }
} finally {
  try {
    admin.close()
  } catch {
    case ex: Exception => ex.printStackTrace()
  }
}

}

/**
* **************** 删除 一条/一列/一列簇 的数据
*
* @param tableName 表名
* @param rowkey 行键
* @param limit 可选参数, [列簇,列] 如果不加最后一个参数,将按 rowkey 删除
/
def deleteData(conn: Connection,tableName: String, rowkey: String, limit: String
) = {
val table = TableName.valueOf(tableName)
var currentTable: Table = null
try {
currentTable = conn.getTable(table)
val del = new Delete(Bytes.toBytes(rowkey))
if (limit.length > 2) {
throw new IllegalArgumentException(“number of argument is not right”)
} else if (limit.length == 2) {
del.addColumn(Bytes.toBytes(limit(0)), Bytes.toBytes(limit(1)))
} else if (limit.length == 1) {
del.addFamily(Bytes.toBytes(limit(0)))
}
currentTable.delete(del)
} catch {
case ex: Exception => {
// LOG.debug(ex,ex.fillInStackTrace())
ex.printStackTrace()
}
} finally {
try {
currentTable.close()
} catch {
case ex: Exception => ex.printStackTrace()
}
}
}

//重载
def deleteData(conn: Connection,tableName: String, rowkey: Int, limit: String*) = {
val table = TableName.valueOf(tableName)
var currentTable: Table = null
try {
currentTable = conn.getTable(table)
val del = new Delete(Bytes.toBytes(rowkey))
if (limit.length > 2) {
throw new IllegalArgumentException(“number of argument is not right”)
} else if (limit.length == 2) {
del.addColumn(Bytes.toBytes(limit(0)), Bytes.toBytes(limit(1)))
} else if (limit.length == 1) {
del.addFamily(Bytes.toBytes(limit(0)))
}
currentTable.delete(del)
} catch {
case ex: Exception => {
// LOG.debug(ex,ex.fillInStackTrace())
ex.printStackTrace()
}
} finally {
try {
currentTable.close()
} catch {
case ex: Exception => ex.printStackTrace()
}
}
}

/**
* ************* 基于rowkey范围删除数据
*
* @param table
* @param startRowKey
* @param endRowKey
*/
def batchDeleteData(conn: Connection,table: String, startRowKey: String, endRowKey: String) = {
val scan = new Scan
scan.setStartRow(Bytes.toBytes(startRowKey))
scan.setStopRow(Bytes.toBytes(endRowKey))
val list = new util.ArrayListDelete
var scanner: ResultScanner = null
var currentTable: Table = null

try {
  currentTable = conn.getTable(TableName.valueOf(table))
  scanner = currentTable.getScanner(scan)
  val scIt = scanner.iterator()

  while (scIt.hasNext) {
    val del = new Delete(scIt.next.getRow)
    list.add(del)

    if (list.size() >= 4096) {
      currentTable.delete(list)
      list.clear()
    }
  }

  currentTable.delete(list)
} catch {
  case ex: IOException => ex.printStackTrace()
} finally {
  try {
    scanner.close()
    currentTable.close()
  } catch {
    case ex: IOException => ex.printStackTrace()
  }
}

}

//********************************************查询区
/
************* 得到一行完整数据
* get 一条行键 = rowkey 的数据
*
* @param tableName
* @param rowkey
* @return
*/
def getOneRowData(conn: Connection,tableName: String, rowkey: String): Result = {

val table = TableName.valueOf(tableName)
var result: Result = null
var currentTable: Table = null
try {
  currentTable = conn.getTable(table)
  val row = new Get(Bytes.toBytes(rowkey))
  result = currentTable.get(row)
  result
} catch {
  case ex: Exception => {
    //        LOG.debug(ex,ex.fillInStackTrace())
    ex.printStackTrace()
    result
  }
} finally {
  try {
    currentTable.close()
  } catch {
    case ex: IOException => ex.printStackTrace()
  }
}

}

/**
* ****************** 查询一定范围的rowkey
*
* @param tablename
* @param startrow
* @param stoprow
* @return
*/
def scanRangeRow(conn: Connection,tablename: String, startrow: String, stoprow: String): ResultScanner = {
var scanner: ResultScanner = null
try {
val table = conn.getTable(TableName.valueOf(tablename))
val scan = new Scan()
scan.setStartRow(Bytes.toBytes(startrow))
scan.setStopRow(Bytes.toBytes(stoprow))
val scanner = table.getScanner(scan)

  scanner
} catch {
  case ex: IOException => ex.printStackTrace()
    scanner
}

}

/**
* **************** 查询一定范围rowkey,并添加过滤条件
*
* @param tablename
* @param scan
* @return
*/
def scanRangeRow(conn: Connection,tablename: String, scan: Scan): ResultScanner = {
var scanner: ResultScanner = null
try {
val table = conn.getTable(TableName.valueOf(tablename))
scanner = table.getScanner(scan)
scanner
} catch {
case ex: IOException => ex.printStackTrace()
scanner
}
}

/************************ 使用spark读取hbase的数据
*
* 传入的rdd所使用的sparkconf需要使用KryoSerializer序列化
* 可选项:sparkConf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
*/
def sparkFromHbase(hbaseConf:Configuration,tablename:String,sc:SparkContext): RDD[Result] = {

hbaseConf.set(TableInputFormat.INPUT_TABLE, tablename)
//读取数据并转化成rdd
var rdd:RDD[Result]=null
try {
  val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
    classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
    classOf[org.apache.hadoop.hbase.client.Result])
 rdd =hBaseRDD.map(x=>x._2)
  rdd
} catch {
  case ex:Exception =>ex.printStackTrace()
    rdd
}

}

//*************************************************插入区

/** *********** 向某个表插入单条数据
*
* @param tableName 表名
* @param rowkey 行键
* @param cf 列簇
* @param qualifier 列
* @return
*/
def insertData(conn: Connection,tableName: String, rowkey: String, cf: String, qualifier: String, value: String) = {

val table = TableName.valueOf(tableName)
var currentTable: Table = null
try {
  currentTable = conn.getTable(table)
  val put = new Put(Bytes.toBytes(rowkey))
  put.addColumn(Bytes.toBytes(cf), Bytes.toBytes(qualifier), Bytes.toBytes(value))
  currentTable.put(put)
} catch {
  case ex: Exception => {
    //        LOG.debug(ex,ex.fillInStackTrace())
    ex.printStackTrace()
  }
} finally {
  try {
    currentTable.close()
  } catch {
    case ex: IOException => ex.printStackTrace()
  }
}

}

/**
* ********** 向某个表中批量插入数据
*
* @param tableName 表名
* @param puts 要插入的Puts
*/
def putBatchData(conn: Connection,tableName: String, puts: util.List[Put]) = {
val table = TableName.valueOf(tableName)
var currentTable: Table = null
try {
currentTable = conn.getTable(table)
currentTable.put(puts)
} catch {
case ex: IOException => ex.printStackTrace()
} finally {
try {
currentTable.close()
} catch {
case ex: IOException => ex.printStackTrace()
}
}
}

/**
* ******************** 用spark向hbase写入数据
*
* @param tablename
* @param rdd :RDD[Put]
/
def sparkToHbasemapreduce(hbaseConf:Configuration,tablename: String, rdd: RDD[Put]) = {
/
*
* 传入的rdd所使用的sparkconf需要使用KryoSerializer序列化
* 必选项:sparkconf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)
* 可选项:sparkconf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result]))
*/

hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
var job: Job=null
try {
  job= Job.getInstance(hbaseConf)
  job.setOutputKeyClass(classOf[ImmutableBytesWritable])
  job.setOutputValueClass(classOf[Result])
  job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
  rdd.map(x => (new ImmutableBytesWritable, x))
    .saveAsNewAPIHadoopDataset(job.getConfiguration)
} catch {
  case ex:Exception =>ex.printStackTrace()
}

}

/**
* ******************** 用spark向hbase写入数据
* 使用旧的mapred
*
* @param tablename
* @param rdd :RDD[Put]
*/
def sparkToHbaseMapred(hbaseConf:Configuration,tablename: String, rdd: RDD[Put]) = {
import org.apache.hadoop.hbase.mapred.TableOutputFormat
try {
val jobConf: JobConf = new JobConf(hbaseConf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
rdd.map(x => (new ImmutableBytesWritable, x))
.saveAsHadoopDataset(jobConf)
} catch {
case ex:Exception =>ex.printStackTrace()
}
}

/**
* *************************流操作及其它操作
*/

}

www.htsjk.Com true http://www.htsjk.com/hbase/41943.html NewsArticle Hbase工具类, package bj.sinorail.utils import java.io.IOException import java.util import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, NamespaceDescriptor...
相关文章
    暂无相关文章
评论暂时关闭