Flink 读写HBase入门示例 (Scala版本),hbasescala
目录
1. 读取操作
2. 写入操作
Fink源码(v1.6.1)示例中只提供了Java版本,而没有Scala版本,于是仿照着写了Scala版本。
参考:
org.apache.flink.addons.hbase.example.HBaseReadExample
org.apache.flink.addons.hbase.example.HBaseWriteExample
1. 读取操作
HBase测试数据:
代码:
package com.ccclubs.flinkdemo
import org.apache.flink.addons.hbase.TableInputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.ConfigConstants
import org.apache.flink.streaming.api.scala._
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.util.Bytes
/**
* Author: xianghu.wang
* Date: 2018/10/16
*
* Simple stub for HBase DataSet read
*
* 使用hbase shell创建表 test-table,测试此示例.
*
* 使用下述命令:
* create 'test-table', 'someCf'
* put 'test-table', '1', 'someCf:someQual', 'someString'
* put 'test-table', '2', 'someCf:someQual', 'anotherString'
*
* 测试结果应只返回第一行
*/
object ReadFromHBase {
def main(args: Array[String]): Unit = {
// 表名
val tableName = "test-table"
// 列族
val cf = "someCf".getBytes(ConfigConstants.DEFAULT_CHARSET)
// 列名
val column = "someQual".getBytes(ConfigConstants.DEFAULT_CHARSET)
// 执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 表输入格式
val tableInputFormat = new TableInputFormat[Tuple2[String, String]] {
// 结果Tuple
val reuse = new Tuple2[String, String]
/**
* 把Result中的结果写出到Tuple中返回
*
* @param result
* @return
*/
override def mapResultToTuple(result: Result): Tuple2[String, String] = {
val key = Bytes.toString(result.getRow)
val value = Bytes.toString(result.getValue(cf, column))
reuse.setField(key, 0)
reuse.setField(value, 1)
reuse
}
override def getTableName: String = tableName
override def getScanner: Scan = {
val scan = new Scan
scan.addColumn(cf, column)
scan
}
}
val hbaseDs = env.createInput(tableInputFormat)
.filter(_.f1.startsWith("someStr")) // 过滤,只要以“someStr”开头的数据
hbaseDs.print()
}
}
测试结果:
2. 写入操作
数据:
hbaseKv.txt
100,flink
101,spark
102,hadoop
代码:
package com.ccclubs.hbase
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.hadoop.hbase.client.{Mutation, Put}
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
/**
* Author: xianghu.wang
* Date: 2018/10/25
* Description: 写出数据到Hbase
* 示例数据:
* 100,flink
* 101,spark
* 102,hadoop
*/
object Write2Hbase {
def main(args: Array[String]): Unit = {
// table params
val tableName = "test-table"
val cf = "someCf".getBytes(ConfigConstants.DEFAULT_CHARSET)
val column = "someQual".getBytes(ConfigConstants.DEFAULT_CHARSET)
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// create data
val inputDs: DataSet[String] = env.readTextFile("src/main/resources/hbaseKv.txt")
val outputDs: DataSet[Tuple2[Int, String]] = inputDs.map(x => {
val kvs = x.split(",")
new Tuple2(kvs(0).toInt, kvs(1))
})
// emit result
val job: Job = Job.getInstance
job.getConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tableName)
job.getConfiguration.set("mapred.output.dir", "/tmp")
outputDs.map(new RichMapFunction[Tuple2[Int, String], Tuple2[Text, Mutation]] {
@transient private var reuse: Tuple2[Text, Mutation] = null
@throws(classOf[Exception])
override def open(parameters: Configuration): Unit = {
super.open(parameters)
reuse = new Tuple2[Text, Mutation]
}
/**
* 将Tuple2[Int, String]类型的输入数据转成Tuple2[Text, Mutation]类型
*
* @param t
* @return
*/
@throws(classOf[Exception])
override def map(t: Tuple2[Int, String]): Tuple2[Text, Mutation] = {
reuse.f0 = new Text(Bytes.toBytes(t.f0))
val put = new Put(t.f0.toString.getBytes(ConfigConstants.DEFAULT_CHARSET))
put.addColumn(cf, column, Bytes.toBytes(t.f1))
reuse.f1 = put
reuse
}
}).output(new HadoopOutputFormat[Text, Mutation](new TableOutputFormat[Text], job))
// execute program
env.execute("write into hbase ...")
}
}
插入数据前:
插入数据后:
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。