本文主要讲解“如何快速掌握Fink SQL”,文中讲解内容简单明了,易学易懂。请跟随边肖的思路,一起学习和学习“如何快速掌握Fink SQL”。
1、导入所需要的的依赖包
dependencygroupidg . Apache . Flink/group idartifactidflink-Table-Planner _ 2.12/artifactidversion 1 . 10 . 1/version/Dependency Dependency cygroupidg . Apache . Flink/group idartifactidflink-Table-API-Scala-bridge _ 2.12/artifactidversion 1 . 10 . 1/version/Dependency Dependency Flink-Table-Planner:Planner是table API最重要的部分,它提供了生成程序执行计划的运行时环境和计划器flink-table-API-scala-Bridge:Bridge Bridge,主要负责表API和DataStream/DataSet API之间的连接支持,按语言分为java和Scala。
在IDE环境中运行时,需要添加这两个依赖项。如果是生产环境,默认情况下计划器已经存在于lib目录中,因此只需要桥。
当然,如果您想使用用户定义的函数或与kafka连接,您需要有一个SQL客户端,它包含在flink-table-common中。
2、两种 planner(old blink)的区别
鸿蒙系统正式战略合作,打造——HarmonyOS技术社区。
均匀批处理流:Blink将批处理作业视为流处理的特例。因此,blink不支持表和数据集之间的转换,批处理作业不会转换为数据集应用程序,而是转换为数据流程序进行处理,就像流处理一样。
为了批量流的统一,Blink planner不支持BatchTableSource,而是使用有界的。
Blink planner只支持全新的目录,不支持过时的ExternalCatalog。
旧规划器和闪现规划器的可过滤列表。
ableSource 实现不兼容。旧的 planner 会把PlannerExpressions 下推到 filterableTableSource 中,而 blink planner 则会把 Expressions 下推。
基于字符串的键值配置选项仅适用于 Blink planner。
PlannerConfig 在两个 planner 中的实现不同。
Blink planner 会将多个 sink 优化在一个 DAG 中(仅在 TableEnvironment 上受支持,而在 StreamTableEnvironment 上不受支持)。而旧 planner 的优化总是将每一个 sink 放在一个新的 DAG 中,其中所有 DAG 彼此独立。
旧的 planner 不支持目录统计,而 Blink planner 支持。
3、表(Table)的概念
TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表。它会维护一个Catalog-Table 表之间的 map。 表(Table)是由一个标识符来指定的,由 3 部分组成:Catalog 名、数据库(database)名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。
4、连接到文件系统(Csv 格式)
连接外部系统在 Catalog 中注册表,直接调用 tableEnv.connect()就可以,里面参数要传入一个 ConnectorDescriptor,也就是 connector 描述器。对于文件系统的 connector 而言,flink内部已经提供了,就叫做 FileSystem()。
5、测试案例 (新)
需求: 将一个txt文本文件作为输入流读取数据过滤id不等于sensor_1的数据实现思路: 首先我们先构建一个table的env环境通过connect提供的方法来读取数据然后设置表结构将数据注册为一张表就可进行我们的数据过滤了(使用sql或者流处理方式进行解析)
准备数据
sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9
代码实现
import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{DataTypes} import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} /** * @Package * @author 大数据老哥 * @date 2020/12/12 21:22 * @version V1.0 * 第一个Flinksql测试案例 */ object FlinkSqlTable { def main(args: Array[String]): Unit = { // 构建运行流处理的运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment // 构建table环境 val tableEnv = StreamTableEnvironment.create(env) //通过 connect 读取数据 tableEnv.connect(new FileSystem().path("D:\\d12\\Flink\\FlinkSql\\src\\main\\resources\\sensor.txt")) .withFormat(new Csv()) //设置类型 .withSchema(new Schema() // 给数据添加元数信息 .field("id", DataTypes.STRING()) .field("time", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) ).createTemporaryTable("inputTable") // 创建一个临时表 val resTable = tableEnv.from("inputTable") .select("*").filter('id === "sensor_1") // 使用sql的方式查询数据 var resSql = tableEnv.sqlQuery("select * from inputTable where id='sensor_1'") // 将数据转为流进行输出 resTable.toAppendStream[(String, Long, Double)].print("resTable") resSql.toAppendStream[(String, Long, Double)].print("resSql") env.execute("FlinkSqlWrodCount") } }
6、TableEnvironment 的作用
-
注册 catalog
-
在内部 catalog 中注册表
-
执行 SQL 查询
-
注册用户自定义函数
-
注册用户自定义函数
-
保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
在创建 TableEnv 的时候,可以多传入一个 EnvironmentSettings 或者 TableConfig 参数,可以用来配置 TableEnvironment 的一些特性。
7、 老版本创建流处理批处理
7.1老版本流处理
val settings = EnvironmentSettings.newInstance() .useOldPlanner() // 使用老版本 planner .inStreamingMode() // 流处理模式 .build() val tableEnv = StreamTableEnvironment.create(env, settings)
7.2 老版本批处理
val batchEnv = ExecutionEnvironment.getExecutionEnvironment val batchTableEnv = BatchTableEnvironment.create(batchEnv)
7.3 blink 版本的流处理环境
val bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)
7.4 blink 版本的批处理环境
val bbSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode().build() val bbTableEnv = TableEnvironment.create(bbSettings)
感谢各位的阅读,以上就是“如何快速掌握Fink SQL”的内容了,经过本文的学习后,相信大家对如何快速掌握Fink SQL这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!
内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/41739.html