Flink 读取hive,写入hive,
1,读取实现了,也是找的资料,核心就是实现了
HCatInputFormat
HCatInputFormatBase
上面这两个类,底层也是 继承实现了 RichInputFormat:
public abstract class HCatInputFormatBase<T> extends RichInputFormat<T, HadoopInputSplit> implements ResultTypeQueryabl
百度下载这个jar,然后把类找出来
依赖:(大概是这些)
<!--flink_hive依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-fs</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>com.jolbox</groupId>
<artifactId>bonecp</artifactId>
<version>0.8.0.RELEASE</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>parquet-hive-bundle</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-cli</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-shims</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-core</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<version>0.9.3</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
<version>1.6.2</version>
</dependency>
package com.coder.flink.core.FlinkHive
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.hadoop.conf.Configuration
import org.apache.flink.api.scala._
//读取hive的数据
object ReadHive {
def main(args: Array[String]): Unit = {
val conf = new Configuration()
conf.set("hive.metastore.local", "false")
conf.set("hive.metastore.uris", "thrift://172.10.4.141:9083")
//如果是高可用 就需要是nameserver
// conf.set("hive.metastore.uris", "thrift://172.10.4.142:9083")
val env = ExecutionEnvironment.getExecutionEnvironment
//todo 返回类型
val dataset: DataSet[TamAlert] = env.createInput(new HCatInputFormat[TamAlert]("aijiami", "test", conf))
dataset.first(10).print()
// env.execute("flink hive test")
}
}
2,写入正在研究,要继承实现 RichInputFormat,不过Flink 有一个
具体不知道方法怎么实现。。。。。。等有时间再搞搞。
本站文章为和通数据库网友分享或者投稿,欢迎任何形式的转载,但请务必注明出处.
同时文章内容如有侵犯了您的权益,请联系QQ:970679559,我们会在尽快处理。