如何快速掌握Fink SQL

技术如何快速掌握Fink SQL这篇文章主要讲解了“如何快速掌握Fink SQL”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“如何快速掌握Fink SQL”吧!1、导入所

本文主要讲解“如何快速掌握Fink SQL”,文中讲解内容简单明了,易学易懂。请跟随边肖的思路,一起学习和学习“如何快速掌握Fink SQL”。

1、导入所需要的的依赖包

dependencygroupidg . Apache . Flink/group idartifactidflink-Table-Planner _ 2.12/artifactidversion 1 . 10 . 1/version/Dependency Dependency cygroupidg . Apache . Flink/group idartifactidflink-Table-API-Scala-bridge _ 2.12/artifactidversion 1 . 10 . 1/version/Dependency Dependency Flink-Table-Planner:Planner是table API最重要的部分,它提供了生成程序执行计划的运行时环境和计划器flink-table-API-scala-Bridge:Bridge Bridge,主要负责表API和DataStream/DataSet API之间的连接支持,按语言分为java和Scala。

在IDE环境中运行时,需要添加这两个依赖项。如果是生产环境,默认情况下计划器已经存在于lib目录中,因此只需要桥。

当然,如果您想使用用户定义的函数或与kafka连接,您需要有一个SQL客户端,它包含在flink-table-common中。

2、两种 planner(old blink)的区别

鸿蒙系统正式战略合作,打造——HarmonyOS技术社区。

均匀批处理流:Blink将批处理作业视为流处理的特例。因此,blink不支持表和数据集之间的转换,批处理作业不会转换为数据集应用程序,而是转换为数据流程序进行处理,就像流处理一样。

为了批量流的统一,Blink planner不支持BatchTableSource,而是使用有界的。

Blink planner只支持全新的目录,不支持过时的ExternalCatalog。

旧规划器和闪现规划器的可过滤列表。

ableSource 实现不兼容。旧的 planner  会把PlannerExpressions 下推到 filterableTableSource 中,而 blink planner 则会把 Expressions  下推。

  • 基于字符串的键值配置选项仅适用于 Blink planner。

  • PlannerConfig 在两个 planner 中的实现不同。

  • Blink planner 会将多个 sink 优化在一个 DAG 中(仅在 TableEnvironment 上受支持,而在  StreamTableEnvironment 上不受支持)。而旧 planner 的优化总是将每一个 sink 放在一个新的 DAG 中,其中所有 DAG  彼此独立。

  • 旧的 planner 不支持目录统计,而 Blink planner 支持。

  • 3、表(Table)的概念

    TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表。它会维护一个Catalog-Table 表之间的  map。 表(Table)是由一个标识符来指定的,由 3 部分组成:Catalog  名、数据库(database)名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。

    4、连接到文件系统(Csv 格式)

    连接外部系统在 Catalog 中注册表,直接调用 tableEnv.connect()就可以,里面参数要传入一个  ConnectorDescriptor,也就是 connector 描述器。对于文件系统的 connector 而言,flink内部已经提供了,就叫做  FileSystem()。

    5、测试案例 (新)

    需求: 将一个txt文本文件作为输入流读取数据过滤id不等于sensor_1的数据实现思路:  首先我们先构建一个table的env环境通过connect提供的方法来读取数据然后设置表结构将数据注册为一张表就可进行我们的数据过滤了(使用sql或者流处理方式进行解析)

    准备数据

    sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9

    代码实现

    import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{DataTypes} import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}  /**  * @Package  * @author 大数据老哥  * @date 2020/12/12 21:22  * @version V1.0  *          第一个Flinksql测试案例  */  object FlinkSqlTable {   def main(args: Array[String]): Unit = {     // 构建运行流处理的运行环境     val env = StreamExecutionEnvironment.getExecutionEnvironment     // 构建table环境     val tableEnv = StreamTableEnvironment.create(env)      //通过 connect 读取数据     tableEnv.connect(new FileSystem().path("D:\\d12\\Flink\\FlinkSql\\src\\main\\resources\\sensor.txt"))       .withFormat(new Csv()) //设置类型       .withSchema(new Schema() // 给数据添加元数信息         .field("id", DataTypes.STRING())         .field("time", DataTypes.BIGINT())         .field("temperature", DataTypes.DOUBLE())       ).createTemporaryTable("inputTable")  // 创建一个临时表          val resTable = tableEnv.from("inputTable")       .select("*").filter('id === "sensor_1")     // 使用sql的方式查询数据     var resSql = tableEnv.sqlQuery("select * from inputTable where id='sensor_1'")     // 将数据转为流进行输出     resTable.toAppendStream[(String, Long, Double)].print("resTable")     resSql.toAppendStream[(String, Long, Double)].print("resSql")      env.execute("FlinkSqlWrodCount")   } }

    6、TableEnvironment 的作用

    • 注册 catalog

    • 在内部 catalog 中注册表

    • 执行 SQL 查询

    • 注册用户自定义函数

    • 注册用户自定义函数

    • 保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

    在创建 TableEnv 的时候,可以多传入一个 EnvironmentSettings 或者 TableConfig 参数,可以用来配置  TableEnvironment 的一些特性。

    7、 老版本创建流处理批处理

    7.1老版本流处理

    val settings = EnvironmentSettings.newInstance() .useOldPlanner() // 使用老版本 planner .inStreamingMode() // 流处理模式 .build() val tableEnv = StreamTableEnvironment.create(env, settings)

    7.2 老版本批处理

    val batchEnv = ExecutionEnvironment.getExecutionEnvironment  val batchTableEnv = BatchTableEnvironment.create(batchEnv)

    7.3 blink 版本的流处理环境

    val bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)

    7.4 blink 版本的批处理环境

    val bbSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode().build() val bbTableEnv = TableEnvironment.create(bbSettings)

    感谢各位的阅读,以上就是“如何快速掌握Fink SQL”的内容了,经过本文的学习后,相信大家对如何快速掌握Fink SQL这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

    内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/41739.html

    (1)

    相关推荐

    • 发朋友圈文字怎么不被折叠,苹果手机朋友圈折叠怎么处理

      技术发朋友圈文字怎么不被折叠,苹果手机朋友圈折叠怎么处理下载安装“讯飞输入法”,进入输入状态,点击“快捷输入”图标,进入“剪贴板”页面发朋友圈文字怎么不被折叠。打开“朋友圈模式”,需要开启完全访问状态,点击“立即开启”。

      生活 2021年10月30日
    • css如何设置单词内字母的间距

      技术css如何设置单词内字母的间距本篇内容介绍了“css如何设置单词内字母的间距”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有

      攻略 2021年11月15日
    • abab的词语,abab的形容词语有哪些

      技术abab的词语,abab的形容词语有哪些ABAB没有成语,词语有不少,列举如下abab的词语: 努力努力 享受享受 了解了解 打探打探 打听打听 分析分析 娱乐娱乐 紧张紧张 暖和暖和 凉快凉快 学习学习 精神精神

      生活 2021年10月25日
    • ai怎么画三角形,AI里怎么画圆角三角形

      技术ai怎么画三角形,AI里怎么画圆角三角形方法ai怎么画三角形:1、打开ai ctrl+n新建文件 选择“多边形工具”。
      2、在画板上按住左键画形状,默认出现的是五边形,按住左键不松手,同时点击“向下的方向键”每点

      生活 2021年10月24日
    • 视频类型,各种视频格式有什么区别

      技术视频类型,各种视频格式有什么区别简单说一下吧,太复杂的感觉一般人也不会去关心视频类型。 通俗点讲就是容器与容器中装的东西的区别,比如常见的MP4(容器),视频编码x264 x265,音频编码AAC FLAC DTS,

      生活 2021年10月22日
    • 方程式题目五年级,小学五年级数学应用题目什么没有

      技术方程式题目五年级,小学五年级数学应用题目什么没有某小学一班植树48棵,比二班少植树8棵,二班植树多少棵?人民小学食堂运来400千克大米,已经吃了5分之2,还剩多少千克?用一张边长20厘米的正方形纸,裁剪粘贴成一个无盖

      生活 2021年10月23日