如何理解flink 1.11 中的JDBC Catalog

技术如何理解flink 1.11 中的JDBC Catalog今天就跟大家聊聊有关如何理解flink 1.11 中的JDBC Catalog,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根

今天,我将和你谈谈如何理解flink 1.11中的JDBC目录。很多人可能不太了解。为了让大家更好的了解,边肖为大家总结了以下内容。希望你能从这篇文章中有所收获。

00-1010 1.11.0之前,如果用户依赖Flink的源/宿读写关系数据库或读取changelog,则必须手动创建相应的模式。但是,会有问题。当数据库中的模式发生变化时,还需要手动更新相应的Flink任务,以保持类型匹配。任何不匹配都会在运行时导致错误,并使作业失败。这种操作冗余且繁琐,体验极差。

事实上,任何与Flink连接的外部系统都可能存在上述类似的问题,在1.11.0中,主要解决了与关系数据库的接口问题。提供了JDBC目录的基本接口和Postgres目录的实现,便于后续实现与其他类型的关系数据库接口。

1.11.0版以后,用户使用Flink SQL时,无需输入DDL,就可以自动获取表的模式。此外,任何模式不匹配的错误都将在编译阶段提前检查和报告,从而避免由以前的运行时错误导致的作业失败。

00-1010目前,对于jdbc目录,flink只提供postgres目录。让我们解释如何使用基于postgres目录的flink目录。

引入pomdependency

groupIdorg.postgresql/groupId

artifactIdpostgresql/artifactId

版本42 . 2 . 5/版本

/依赖性

创建新的PostgresCatalog

目前,flink通过一个静态类创建相应的jdbc目录。对于PostgresCatalog,没有公共类型构造方法。当通过JdbcCatalogUtils.createCatalog构造PostgresCatalog时,这五个参数是必需的,其中baseUrl要求是不能带有数据库名的

StringcatalogName=' mycatalog

StringdefaultDatabase=' postgres ';

Stringusername=' postgres

Stringpwd=' postgres

stringbaseURl=' JDBC : PostgreSQL ://localhost :5432/';

PostgresCatalog PostgresCatalog=(PostgresCatalog)jdbccatalogils . createcatalog(

catalogName,

默认数据库,

用户名,

pwd,

baseURl);

e>
 

访问postgres 数据库指定表名的时候完整的路径名应该是以下格式:

<catalog>.<db>.`<schema.table>`

 

其中schema默认是public,如果是使用缺省值,public是可以省略的。比如下面的查询语句:

SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

 

如果非缺省schema,则不能被省略:

SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;

 

  • 常见操作

我们PostgresCatalog将注册到StreamTableEnvironment 的变量tEnv中,然后就可以用tEnv进行一些操作了。

 tEnv.registerCatalog(postgresCatalog.getName(), postgresCatalog);
  tEnv.useCatalog(postgresCatalog.getName());

 

  1. 列出来所有的数据库:
        System.out.println("list databases :");
  String[] databases = tEnv.listDatabases();
  Stream.of(databases).forEach(System.out::println);

 

  1. 列出来所有的table
     tEnv.useDatabase(defaultDatabase);
  System.out.println("list tables :");
  String[] tables = tEnv.listTables(); // 也可以使用  postgresCatalog.listTables(defaultDatabase);
  Stream.of(tables).forEach(System.out::println);

 

  1. 列出所有函数
        System.out.println("list functions :");
  String[] functions = tEnv.listFunctions();
  Stream.of(functions).forEach(System.out::println);

 

  1. 获取table的schema
 CatalogBaseTable catalogBaseTable = postgresCatalog.getTable(new ObjectPath(
    defaultDatabase,
    "table1"));

  TableSchema tableSchema = catalogBaseTable.getSchema();
  System.out.println("tableSchema --------------------- :");
  System.out.println(tableSchema);

 

  1. 查询表的数据
  List<Row> results = Lists.newArrayList(tEnv.sqlQuery("select * from table1")
                                             .execute()
                                             .collect());
  results.stream().forEach(System.out::println);

 

  1. 插入数据
tEnv.executeSql("insert into table1 values (3,'c')");

 

完整的代码请参考:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/catalog/PostgresCatalogTest.java

 

源码解析

 

AbstractJdbcCatalog

这个类主要是对jdbc catalog一些公共的操作做了抽象.目前实现了实际功能的只有一个方法:getPrimaryKey,其他方式主要是对于Catalog的一些其他实现类做了特殊处理,比如类似create table 或者 alter table是不支持的,listView只是返回一个空列表,因为我们使用jdbc catalog主要是来做一些DML操作。

 @Override
 public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
  throw new UnsupportedOperationException();
 }

 @Override
 public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
  return Collections.emptyList();
 }

 
 

PostgresCatalog

在这里面,主要是实现了一些常用的操作数据库的方法,比如getTable、listTables、listDatabases等等,其实简单的来说就是从postgres元数据库里查询出来相应的信息,然后组装成flink的相关对象,返回给调用方。以一个简单的方法listDatabases为例:

从元数据表pg_database中查询所有的tablename,然后去掉内置的数据库,也就是template0和template1,然后封装到一个list对象里,返回。

 @Override
 public List<String> listDatabases() throws CatalogException {
  List<String> pgDatabases = new ArrayList<>();

  try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {

   PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");

   ResultSet rs = ps.executeQuery();

   while (rs.next()) {
    String dbName = rs.getString(1);
    if (!builtinDatabases.contains(dbName)) {
     pgDatabases.add(rs.getString(1));
    }
   }

   return pgDatabases;
  } catch (Exception e) {
   throw new CatalogException(
    String.format("Failed listing database in catalog %s", getName()), e);
  }
 }

 

有不兼容的地方需要做一些转换,比如getTable方法,有些数据类型是不匹配的,要做一些类型的匹配,如postgres里面的serial和int4都会转成flink的int类型,具体的参考下PostgresCatalog#fromJDBCType方法。

 private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
  String pgType = metadata.getColumnTypeName(colIndex);

  int precision = metadata.getPrecision(colIndex);
  int scale = metadata.getScale(colIndex);

  switch (pgType) {
   case PG_BOOLEAN:
    return DataTypes.BOOLEAN();
   case PG_BOOLEAN_ARRAY:
    return DataTypes.ARRAY(DataTypes.BOOLEAN());
   case PG_BYTEA:
    return DataTypes.BYTES();
    .........................

看完上述内容,你们对如何理解flink 1.11 中的JDBC Catalog有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/113486.html

(0)

相关推荐

  • Hive怎么优化查询效率

    技术Hive怎么优化查询效率这篇文章将为大家详细讲解有关Hive怎么优化查询效率,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。 1,开启FetchTask一个简单的查询语句,是指一个没

    攻略 2021年12月10日
  • mysql复制表的几种方式

    技术mysql复制表的几种方式 mysql复制表的几种方式所描述的方法还请实际测试一下再使用.
    1、复制表结构及数据到新表
    CREATE TABLE 新表SELECT * FROM 旧表
    这种方法会将o

    礼包 2021年12月17日
  • school的音标,音标中长短音单词怎么区别

    技术school的音标,音标中长短音单词怎么区别因为相近,所以难辨school的音标。不是长短音的问题,发音长短不是区别的主要方面。长短音只是不负责任的简化表达。iː和ɪ,还有uː和ʊ的发音的区别,根本不是发音长短的问题

    生活 2021年10月19日
  • 休的四字词语,休字在后面的成语有哪些

    技术休的四字词语,休字在后面的成语有哪些一字千金yī zì qiān jīn[释义]增损一字休的四字词语;赏以千金。形容文辞精妙;不可更改。[语出]南北朝·钟嵘《诗品·古诗》:“文温以丽;意悲而远;惊心动魄;可谓几乎一字

    生活 2021年10月29日
  • Python 列表

    技术Python 列表 Python 列表Python 集合(数组)
    Python 编程语言中有四种集合数据类型:
    列表(List)是一种有序和可更改的集合。允许重复的成员。列表是一个有序且可更改的集合

    礼包 2021年11月14日
  • javascript中=、==、===是否有区别

    技术javascript中=、==、===是否有区别这篇文章主要为大家展示了“javascript中=、==、===是否有区别”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“j

    攻略 2021年11月14日