欢迎投稿

今日深度:

Flink 读写HBase入门示例 (Scala版本),hbasescala

Flink 读写HBase入门示例 (Scala版本),hbasescala


目录

1. 读取操作

2. 写入操作


Fink源码(v1.6.1)示例中只提供了Java版本,而没有Scala版本,于是仿照着写了Scala版本。

参考:

org.apache.flink.addons.hbase.example.HBaseReadExample

org.apache.flink.addons.hbase.example.HBaseWriteExample

1. 读取操作

HBase测试数据:

代码:

package com.ccclubs.flinkdemo

import org.apache.flink.addons.hbase.TableInputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.ConfigConstants
import org.apache.flink.streaming.api.scala._
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.util.Bytes

/**
  * Author: xianghu.wang
  * Date: 2018/10/16
  * 
  * Simple stub for HBase DataSet read
  *
  * 使用hbase shell创建表 test-table,测试此示例.
  *
  * 使用下述命令:
  * create 'test-table', 'someCf'
  * put 'test-table', '1', 'someCf:someQual', 'someString'
  * put 'test-table', '2', 'someCf:someQual', 'anotherString'
  *
  * 测试结果应只返回第一行
  */
object ReadFromHBase {
  def main(args: Array[String]): Unit = {
    // 表名
    val tableName = "test-table"
    // 列族
    val cf = "someCf".getBytes(ConfigConstants.DEFAULT_CHARSET)
    // 列名
    val column = "someQual".getBytes(ConfigConstants.DEFAULT_CHARSET)

    // 执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    // 表输入格式
    val tableInputFormat = new TableInputFormat[Tuple2[String, String]] {

      // 结果Tuple
      val reuse = new Tuple2[String, String]

      /**
        * 把Result中的结果写出到Tuple中返回
        *
        * @param result
        * @return
        */
      override def mapResultToTuple(result: Result): Tuple2[String, String] = {
        val key = Bytes.toString(result.getRow)
        val value = Bytes.toString(result.getValue(cf, column))
        reuse.setField(key, 0)
        reuse.setField(value, 1)
        reuse
      }

      override def getTableName: String = tableName

      override def getScanner: Scan = {
        val scan = new Scan
        scan.addColumn(cf, column)
        scan
      }
    }
    
    val hbaseDs = env.createInput(tableInputFormat)
      .filter(_.f1.startsWith("someStr"))  // 过滤,只要以“someStr”开头的数据

    hbaseDs.print()
  }
}

测试结果:

2. 写入操作

数据:

hbaseKv.txt

100,flink
101,spark
102,hadoop

代码:

package com.ccclubs.hbase

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.hadoop.hbase.client.{Mutation, Put}
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job

/**
  * Author: xianghu.wang
  * Date: 2018/10/25
  * Description: 写出数据到Hbase
  * 示例数据:
  * 100,flink
  * 101,spark
  * 102,hadoop
  */
object Write2Hbase {
  def main(args: Array[String]): Unit = {

    // table params
    val tableName = "test-table"
    val cf = "someCf".getBytes(ConfigConstants.DEFAULT_CHARSET)
    val column = "someQual".getBytes(ConfigConstants.DEFAULT_CHARSET)

    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    // create data
    val inputDs: DataSet[String] = env.readTextFile("src/main/resources/hbaseKv.txt")

    val outputDs: DataSet[Tuple2[Int, String]] = inputDs.map(x => {
      val kvs = x.split(",")
      new Tuple2(kvs(0).toInt, kvs(1))
    })

    // emit result
    val job: Job = Job.getInstance
    job.getConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tableName)

    job.getConfiguration.set("mapred.output.dir", "/tmp")

    outputDs.map(new RichMapFunction[Tuple2[Int, String], Tuple2[Text, Mutation]] {
      @transient private var reuse: Tuple2[Text, Mutation] = null

      @throws(classOf[Exception])
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        reuse = new Tuple2[Text, Mutation]
      }

      /**
        * 将Tuple2[Int, String]类型的输入数据转成Tuple2[Text, Mutation]类型
        *
        * @param t
        * @return
        */
      @throws(classOf[Exception])
      override def map(t: Tuple2[Int, String]): Tuple2[Text, Mutation] = {
        reuse.f0 = new Text(Bytes.toBytes(t.f0))
        val put = new Put(t.f0.toString.getBytes(ConfigConstants.DEFAULT_CHARSET))
        put.addColumn(cf, column, Bytes.toBytes(t.f1))
        reuse.f1 = put
        reuse
      }
    }).output(new HadoopOutputFormat[Text, Mutation](new TableOutputFormat[Text], job))

    // execute program
    env.execute("write into hbase ...")

  }
}

插入数据前:

插入数据后:

www.htsjk.Com true http://www.htsjk.com/hbase/34908.html NewsArticle Flink 读写HBase入门示例 (Scala版本),hbasescala 目录 1. 读取操作 2. 写入操作 Fink源码(v1.6.1)示例中只提供了Java版本,而没有Scala版本,于是仿照着写了Scala版本。 参考: org.apache.flin...
相关文章
    暂无相关文章
评论暂时关闭