欢迎投稿

今日深度:

Flink 读取hive,写入hive,

Flink 读取hive,写入hive,


1,读取实现了,也是找的资料,核心就是实现了

HCatInputFormat
HCatInputFormatBase

上面这两个类,底层也是 继承实现了 RichInputFormat:

public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit> implements ResultTypeQueryabl

百度下载这个jar,然后把类找出来


 

依赖:(大概是这些)

 

<!--flink_hive依赖-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hadoop-fs</artifactId>
    <version>1.6.2</version>
</dependency>

<dependency>
    <groupId>com.jolbox</groupId>
    <artifactId>bonecp</artifactId>
    <version>0.8.0.RELEASE</version>
</dependency>

<dependency>
    <groupId>com.twitter</groupId>
    <artifactId>parquet-hive-bundle</artifactId>
    <version>1.6.0</version>
</dependency>

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>2.1.0</version>
</dependency>


<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-metastore</artifactId>
    <version>2.1.0</version>
</dependency>


<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-cli</artifactId>
    <version>2.1.0</version>
</dependency>

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-common</artifactId>
    <version>2.1.0</version>
</dependency>

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-service</artifactId>
    <version>2.1.0</version>
</dependency>

<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-shims</artifactId>
    <version>2.1.0</version>
</dependency>

<dependency>
    <groupId>org.apache.hive.hcatalog</groupId>
    <artifactId>hive-hcatalog-core</artifactId>
    <version>2.1.0</version>
</dependency>

<dependency>
    <groupId>org.apache.thrift</groupId>
    <artifactId>libfb303</artifactId>
    <version>0.9.3</version>
    <type>pom</type>
</dependency>


<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-hadoop-compatibility_2.11</artifactId>
    <version>1.6.2</version>

</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-shaded-hadoop2</artifactId>
    <version>1.6.2</version>
</dependency>
package com.coder.flink.core.FlinkHive


import org.apache.flink.api.scala.ExecutionEnvironment

import org.apache.hadoop.conf.Configuration
import org.apache.flink.api.scala._


//读取hive的数据
object ReadHive {
  def main(args: Array[String]): Unit = {

      val conf = new Configuration()
      conf.set("hive.metastore.local", "false")

      conf.set("hive.metastore.uris", "thrift://172.10.4.141:9083")
       //如果是高可用 就需要是nameserver
//      conf.set("hive.metastore.uris", "thrift://172.10.4.142:9083")

      val env = ExecutionEnvironment.getExecutionEnvironment

      //todo 返回类型
      val dataset: DataSet[TamAlert] = env.createInput(new HCatInputFormat[TamAlert]("aijiami", "test", conf))

      dataset.first(10).print()
//      env.execute("flink hive test")


  }

}

2,写入正在研究,要继承实现 RichInputFormat,不过Flink 有一个

具体不知道方法怎么实现。。。。。。等有时间再搞搞。

www.htsjk.Com true http://www.htsjk.com/hive/32080.html NewsArticle Flink 读取hive,写入hive, 1,读取实现了,也是找的资料,核心就是实现了 HCatInputFormat HCatInputFormatBase 上面这两个类,底层也是 继承实现了 RichInputFormat: public abstract class HCatInputFormat...
相关文章
    暂无相关文章
评论暂时关闭