欢迎投稿

今日深度:

十分钟入门Fink SQL,

十分钟入门Fink SQL,


前言

Flink 本身是批流统一的处理框架,所以 Table API 和 SQL,就是批流统一的上层处理 API。目前功能尚未完善,处于活跃的开发阶段。 Table API 是一套内嵌在 Java 和 Scala 语言中的查询 API,它允许我们以非常直观的方式,组合来自一些关系运算符的查询(比如 select、filter 和 join)。而对于 Flink SQL,就是直接可以在代码中写 SQL,来实现一些查询(Query)操作。Flink 的 SQL 支持,基于实现了 SQL 标准的 Apache Calcite(Apache 开源 SQL 解析工具)。图片

1、导入所需要的的依赖包

  1. <dependency> 
  2.           <groupId>org.apache.flink</groupId> 
  3.           <artifactId>flink-table-planner_2.12</artifactId> 
  4.           <version>1.10.1</version> 
  5.       </dependency> 
  6.       <dependency> 
  7.           <groupId>org.apache.flink</groupId> 
  8.           <artifactId>flink-table-api-scala-bridge_2.12</artifactId> 
  9.           <version>1.10.1</version> 
  10.       </dependency> 
  11.       <dependency> 
  12.           <groupId>org.apache.flink</groupId> 
  13.           <artifactId>flink-csv</artifactId> 
  14.           <version>1.10.1</version> 
  15.      </dependency> 

flink-table-planner:planner 计划器,是 table API 最主要的部分,提供了运行时环境和生成程序执行计划的 planner; flink-table-api-scala-bridge:bridge 桥接器,主要负责 table API 和 DataStream/DataSet API的连接支持,按照语言分 java 和 scala。

这里的两个依赖,是 IDE 环境下运行需要添加的;如果是生产环境,lib 目录下默认已经有了 planner,就只需要有 bridge 就可以了。

当然,如果想使用用户自定义函数,或是跟 kafka 做连接,需要有一个 SQL client,这个包含在 flink-table-common 里。

2、两种 planner(old& blink)的区别

3、表(Table)的概念

TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表。它会维护一个Catalog-Table 表之间的 map。 表(Table)是由一个标识符来指定的,由 3 部分组成:Catalog 名、数据库(database)名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。

4、连接到文件系统(Csv 格式)

连接外部系统在 Catalog 中注册表,直接调用 tableEnv.connect()就可以,里面参数要传入一个 ConnectorDescriptor,也就是 connector 描述器。对于文件系统的 connector 而言,flink内部已经提供了,就叫做 FileSystem()。

5、测试案例 (新)

需求: 将一个txt文本文件作为输入流读取数据过滤id不等于sensor_1的数据实现思路: 首先我们先构建一个table的env环境通过connect提供的方法来读取数据然后设置表结构将数据注册为一张表就可进行我们的数据过滤了(使用sql或者流处理方式进行解析)

准备数据

  1. sensor_1,1547718199,35.8 
  2. sensor_6,1547718201,15.4 
  3. sensor_7,1547718202,6.7 
  4. sensor_10,1547718205,38.1 
  5. sensor_1,1547718206,32 
  6. sensor_1,1547718208,36.2 
  7. sensor_1,1547718210,29.7 
  8. sensor_1,1547718213,30.9 

代码实现

  1. import org.apache.flink.streaming.api.scala._ 
  2. import org.apache.flink.table.api.{DataTypes} 
  3. import org.apache.flink.table.api.scala._ 
  4. import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} 
  5.  
  6. /** 
  7.  * @Package 
  8.  * @author 大数据老哥 
  9.  * @date 2020/12/12 21:22 
  10.  * @version V1.0 
  11.  *          第一个Flinksql测试案例 
  12.  */ 
  13.  
  14. object FlinkSqlTable { 
  15.   def main(args: Array[String]): Unit = { 
  16.     // 构建运行流处理的运行环境 
  17.     val env = StreamExecutionEnvironment.getExecutionEnvironment 
  18.     // 构建table环境 
  19.     val tableEnv = StreamTableEnvironment.create(env) 
  20.      //通过 connect 读取数据 
  21.     tableEnv.connect(new FileSystem().path("D:\\d12\\Flink\\FlinkSql\\src\\main\\resources\\sensor.txt")) 
  22.       .withFormat(new Csv()) //设置类型 
  23.       .withSchema(new Schema() // 给数据添加元数信息 
  24.         .field("id", DataTypes.STRING()) 
  25.         .field("time", DataTypes.BIGINT()) 
  26.         .field("temperature", DataTypes.DOUBLE()) 
  27.       ).createTemporaryTable("inputTable")  // 创建一个临时表 
  28.      
  29.     val resTable = tableEnv.from("inputTable") 
  30.       .select("*").filter('id === "sensor_1") 
  31.     // 使用sql的方式查询数据 
  32.     var resSql = tableEnv.sqlQuery("select * from inputTable where id='sensor_1'") 
  33.     // 将数据转为流进行输出 
  34.     resTable.toAppendStream[(String, Long, Double)].print("resTable") 
  35.     resSql.toAppendStream[(String, Long, Double)].print("resSql") 
  36.  
  37.     env.execute("FlinkSqlWrodCount") 
  38.   } 

6、TableEnvironment 的作用

  • 注册 catalog
  • 在内部 catalog 中注册表
  • 执行 SQL 查询
  • 注册用户自定义函数
  • 注册用户自定义函数
  • 保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

在创建 TableEnv 的时候,可以多传入一个 EnvironmentSettings 或者 TableConfig 参数,可以用来配置 TableEnvironment 的一些特性。

7、 老版本创建流处理批处理

7.1老版本流处理

  1. val settings = EnvironmentSettings.newInstance() 
  2. .useOldPlanner() // 使用老版本 planner 
  3. .inStreamingMode() // 流处理模式 
  4. .build() 
  5. val tableEnv = StreamTableEnvironment.create(env, settings) 

7.2 老版本批处理

  1. val batchEnv = ExecutionEnvironment.getExecutionEnvironment  
  2. val batchTableEnv = BatchTableEnvironment.create(batchEnv) 

7.3 blink 版本的流处理环境

  1. val bsSettings = EnvironmentSettings.newInstance() 
  2. .useBlinkPlanner() 
  3. .inStreamingMode().build() 
  4. val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) 

7.4 blink 版本的批处理环境

  1. val bbSettings = EnvironmentSettings.newInstance() 
  2. .useBlinkPlanner() 
  3. .inBatchMode().build() 
  4. val bbTableEnv = TableEnvironment.create(bbSettings) 

总结:

本篇文章主要讲解了Flink SQL 入门操作,后面我会分享一些关于Flink SQL连接Kafka、输出到kafka、MySQL等

本文转载自微信公众号「 大数据老哥」,可以通过以下二维码关注。转载本文请联系 大数据老哥公众号。

www.htsjk.Com true http://www.htsjk.com/shujukukf/43414.html NewsArticle 十分钟入门Fink SQL, 前言 Flink 本身是批流统一的处理框架,所以 Table API 和 SQL,就是批流统一的上层处理 API。目前功能尚未完善,处于活跃的开发阶段。 Table API 是一套内嵌在 Java 和...
评论暂时关闭