如何理解flink 1.11 中的JDBC Catalog

技术如何理解flink 1.11 中的JDBC Catalog今天就跟大家聊聊有关如何理解flink 1.11 中的JDBC Catalog,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根

今天,我将和你谈谈如何理解flink 1.11中的JDBC目录。很多人可能不太了解。为了让大家更好的了解,边肖为大家总结了以下内容。希望你能从这篇文章中有所收获。

00-1010 1.11.0之前,如果用户依赖Flink的源/宿读写关系数据库或读取changelog,则必须手动创建相应的模式。但是,会有问题。当数据库中的模式发生变化时,还需要手动更新相应的Flink任务,以保持类型匹配。任何不匹配都会在运行时导致错误,并使作业失败。这种操作冗余且繁琐,体验极差。

事实上,任何与Flink连接的外部系统都可能存在上述类似的问题,在1.11.0中,主要解决了与关系数据库的接口问题。提供了JDBC目录的基本接口和Postgres目录的实现,便于后续实现与其他类型的关系数据库接口。

1.11.0版以后,用户使用Flink SQL时,无需输入DDL,就可以自动获取表的模式。此外,任何模式不匹配的错误都将在编译阶段提前检查和报告,从而避免由以前的运行时错误导致的作业失败。

00-1010目前,对于jdbc目录,flink只提供postgres目录。让我们解释如何使用基于postgres目录的flink目录。

引入pomdependency

groupIdorg.postgresql/groupId

artifactIdpostgresql/artifactId

版本42 . 2 . 5/版本

/依赖性

创建新的PostgresCatalog

目前,flink通过一个静态类创建相应的jdbc目录。对于PostgresCatalog,没有公共类型构造方法。当通过JdbcCatalogUtils.createCatalog构造PostgresCatalog时,这五个参数是必需的,其中baseUrl要求是不能带有数据库名的

StringcatalogName=' mycatalog

StringdefaultDatabase=' postgres ';

Stringusername=' postgres

Stringpwd=' postgres

stringbaseURl=' JDBC : PostgreSQL ://localhost :5432/';

PostgresCatalog PostgresCatalog=(PostgresCatalog)jdbccatalogils . createcatalog(

catalogName,

默认数据库,

用户名,

pwd,

baseURl);

e>
 

访问postgres 数据库指定表名的时候完整的路径名应该是以下格式:

<catalog>.<db>.`<schema.table>`

 

其中schema默认是public,如果是使用缺省值,public是可以省略的。比如下面的查询语句:

SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

 

如果非缺省schema,则不能被省略:

SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;

 

  • 常见操作

我们PostgresCatalog将注册到StreamTableEnvironment 的变量tEnv中,然后就可以用tEnv进行一些操作了。

 tEnv.registerCatalog(postgresCatalog.getName(), postgresCatalog);
  tEnv.useCatalog(postgresCatalog.getName());

 

  1. 列出来所有的数据库:
        System.out.println("list databases :");
  String[] databases = tEnv.listDatabases();
  Stream.of(databases).forEach(System.out::println);

 

  1. 列出来所有的table
     tEnv.useDatabase(defaultDatabase);
  System.out.println("list tables :");
  String[] tables = tEnv.listTables(); // 也可以使用  postgresCatalog.listTables(defaultDatabase);
  Stream.of(tables).forEach(System.out::println);

 

  1. 列出所有函数
        System.out.println("list functions :");
  String[] functions = tEnv.listFunctions();
  Stream.of(functions).forEach(System.out::println);

 

  1. 获取table的schema
 CatalogBaseTable catalogBaseTable = postgresCatalog.getTable(new ObjectPath(
    defaultDatabase,
    "table1"));

  TableSchema tableSchema = catalogBaseTable.getSchema();
  System.out.println("tableSchema --------------------- :");
  System.out.println(tableSchema);

 

  1. 查询表的数据
  List<Row> results = Lists.newArrayList(tEnv.sqlQuery("select * from table1")
                                             .execute()
                                             .collect());
  results.stream().forEach(System.out::println);

 

  1. 插入数据
tEnv.executeSql("insert into table1 values (3,'c')");

 

完整的代码请参考:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/catalog/PostgresCatalogTest.java

 

源码解析

 

AbstractJdbcCatalog

这个类主要是对jdbc catalog一些公共的操作做了抽象.目前实现了实际功能的只有一个方法:getPrimaryKey,其他方式主要是对于Catalog的一些其他实现类做了特殊处理,比如类似create table 或者 alter table是不支持的,listView只是返回一个空列表,因为我们使用jdbc catalog主要是来做一些DML操作。

 @Override
 public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
  throw new UnsupportedOperationException();
 }

 @Override
 public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
  return Collections.emptyList();
 }

 
 

PostgresCatalog

在这里面,主要是实现了一些常用的操作数据库的方法,比如getTable、listTables、listDatabases等等,其实简单的来说就是从postgres元数据库里查询出来相应的信息,然后组装成flink的相关对象,返回给调用方。以一个简单的方法listDatabases为例:

从元数据表pg_database中查询所有的tablename,然后去掉内置的数据库,也就是template0和template1,然后封装到一个list对象里,返回。

 @Override
 public List<String> listDatabases() throws CatalogException {
  List<String> pgDatabases = new ArrayList<>();

  try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {

   PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");

   ResultSet rs = ps.executeQuery();

   while (rs.next()) {
    String dbName = rs.getString(1);
    if (!builtinDatabases.contains(dbName)) {
     pgDatabases.add(rs.getString(1));
    }
   }

   return pgDatabases;
  } catch (Exception e) {
   throw new CatalogException(
    String.format("Failed listing database in catalog %s", getName()), e);
  }
 }

 

有不兼容的地方需要做一些转换,比如getTable方法,有些数据类型是不匹配的,要做一些类型的匹配,如postgres里面的serial和int4都会转成flink的int类型,具体的参考下PostgresCatalog#fromJDBCType方法。

 private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
  String pgType = metadata.getColumnTypeName(colIndex);

  int precision = metadata.getPrecision(colIndex);
  int scale = metadata.getScale(colIndex);

  switch (pgType) {
   case PG_BOOLEAN:
    return DataTypes.BOOLEAN();
   case PG_BOOLEAN_ARRAY:
    return DataTypes.ARRAY(DataTypes.BOOLEAN());
   case PG_BYTEA:
    return DataTypes.BYTES();
    .........................

看完上述内容,你们对如何理解flink 1.11 中的JDBC Catalog有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/113486.html

(0)

相关推荐

  • ios怎么整个手机截屏(ios有哪几种截屏方式)

    技术IOS如何实现手机截屏小编给大家分享一下IOS如何实现手机截屏 ,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!IOS手机截屏 主要步骤1

    攻略 2021年12月24日
  • 「IOI2021」Dungeons

    技术「IOI2021」Dungeons 「IOI2021」Dungeons题目
    点这里看题目。
    分析
    比较考察基础的观察和诡异的优化的题目,值得一试。
    算法 1
    直接模拟,复杂度为 \(O(qs)\)。

    礼包 2021年11月18日
  • web边框设置弧形(网页中一竖杠怎么设计渐变)

    技术web中怎么用线性渐变实现斜线这篇文章主要介绍“web中怎么用线性渐变实现斜线”,在日常操作中,相信很多人在web中怎么用线性渐变实现斜线问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”

    攻略 2021年12月22日
  • 编程中常用正则表达式有哪些

    技术编程中常用正则表达式有哪些这篇文章将为大家详细讲解有关编程中常用正则表达式有哪些,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。一、校验数字的表达式1 数字:^[0-9]*$2 n位

    攻略 2021年11月23日
  • 抖音刷粉咋弄,抖音刷粉怎么操作

    技术抖音刷粉咋弄,抖音刷粉怎么操作抖音的玩法如此之多,其中最常见的一段视频的镜头切换是如何做的呢?点击进入抖音后点击“我”,然后再点击“作品”即可看到,目前,抖音内部只能查看自己的视频播放量,也只能在自己主页上的作品栏上

    测评 2021年10月19日
  • 大数据中AWK命令的示例分析

    技术大数据中AWK命令的示例分析这篇文章将为大家详细讲解有关大数据中AWK命令的示例分析,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。对于下面的nginx日志access.log,用脚

    攻略 2021年11月20日