本文将详细解释如何在Flink中获取TableAPI、SQL和Kafka消息。这篇文章的内容质量很高,所以边肖会分享给大家参考。希望你看完这篇文章后有所了解。
使用TbaleSQL和Flink kafka连接器从kafka的消息队列中获取数据。
示例环境
Java . version :1 . 8 . xfrink . version :1 . 11 . 1 Kafka:2.11示例数据源(项目代码云下载)
Flink系统的建设开发环境和数据
示例(pom.xml)
Flink系统的TableAPI SQL和样例模块
SelectToKafka.java
package com . flink . examples . Kafka;
import org . Apache . flink . streaming . API . TiME Template;
import org . Apache . flink . streaming . API . datastream . datastream;
import org . Apache . flink . streaming . API . environment . streaming executionenvironment;
import org . Apache . flink . table . API . EnvironmentSettings;
import org . Apache . flink . table . API . table;
import org . Apache . flink . table . API . bridge . Java . streamtableenvironment;
import org . Apache . flink . types . row;
/**
*@Description使用TbaleSQL和Flinkkafka连接器从Kafka的消息队列中获取数据。
*/
publicclassSelectToKafka{
/**
官方参考:https://ci . Apache . org/project/flink/flink-docs-release-1.12/zh/dev/table/connectors/Kafka . html。
起始偏移位置
configscan.startup.mode选项指定Kafka用户的启动模式。的有效枚举为:
集团抵销:从特定消费群体在ZK/卡夫卡经纪公司的承诺抵销开始。
早偏移:从最早偏移开始。
最新偏移:从最新偏移开始。
时间戳:从每个分区的用户提供的时间戳开始。
特定偏移量:从每个分区的用户提供的特定偏移量开始。
默认选项值组-抵销表示上次从ZK/卡夫卡经纪人提交的抵销消费。
保证一致性
Sink.semantic选项可选择三种不同的操作模式:
无:弗林克不能保证任何事情。生成的记录可能会丢失或重复。
At _ lease _ once(默认设置):这确保不会丢失任何记录(尽管它们可以被复制)。
恰好_一次:Kafka事务将用于提供准确的语义一次。无论何时使用事务写入Kafka,请不要忘记为任何使用Kafka记录的应用程序设置所需的设置隔离级别(read_committed或read_uncommit)
ted-后者是默认值)。
*/
static String table_sql = "CREATE TABLE KafkaTable (\n" +
" `user_id` BIGINT,\n" +
" `item_id` BIGINT,\n" +
" `behavior` STRING,\n" +
" `ts` TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'user_behavior',\n" +
" 'properties.bootstrap.servers' = '192.168.110.35:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json'\n" +
")";
public static void main(String[] args) throws Exception {
//构建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认流时间方式
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//构建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//构建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注册kafka数据维表
tEnv.executeSql(table_sql);
String sql = "select user_id,item_id,behavior,ts from KafkaTable";
Table table = tEnv.sqlQuery(sql);
//打印字段结构
table.printSchema();
//table 转成 dataStream 流
DataStream<Row> behaviorStream = tEnv.toAppendStream(table, Row.class);
behaviorStream.print();
env.execute();
}
}
打印结果
root |-- user_id: BIGINT |-- item_id: BIGINT |-- behavior: STRING |-- ts: TIMESTAMP(3) 3> 1,1,normal,2021-01-26T10:25:44
关于Flink中如何进行TableAPI 、SQL 与 Kafka 消息获取就分享到这里了,希望
内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/146800.html