如何理解flink 1.11 中的JDBC Catalog

技术如何理解flink 1.11 中的JDBC Catalog今天就跟大家聊聊有关如何理解flink 1.11 中的JDBC Catalog,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根

今天,我将和你谈谈如何理解flink 1.11中的JDBC目录。很多人可能不太了解。为了让大家更好的了解,边肖为大家总结了以下内容。希望你能从这篇文章中有所收获。

00-1010 1.11.0之前,如果用户依赖Flink的源/宿读写关系数据库或读取changelog,则必须手动创建相应的模式。但是,会有问题。当数据库中的模式发生变化时,还需要手动更新相应的Flink任务,以保持类型匹配。任何不匹配都会在运行时导致错误,并使作业失败。这种操作冗余且繁琐,体验极差。

事实上,任何与Flink连接的外部系统都可能存在上述类似的问题,在1.11.0中,主要解决了与关系数据库的接口问题。提供了JDBC目录的基本接口和Postgres目录的实现,便于后续实现与其他类型的关系数据库接口。

1.11.0版以后,用户使用Flink SQL时,无需输入DDL,就可以自动获取表的模式。此外,任何模式不匹配的错误都将在编译阶段提前检查和报告,从而避免由以前的运行时错误导致的作业失败。

00-1010目前,对于jdbc目录,flink只提供postgres目录。让我们解释如何使用基于postgres目录的flink目录。

引入pomdependency

groupIdorg.postgresql/groupId

artifactIdpostgresql/artifactId

版本42 . 2 . 5/版本

/依赖性

创建新的PostgresCatalog

目前,flink通过一个静态类创建相应的jdbc目录。对于PostgresCatalog,没有公共类型构造方法。当通过JdbcCatalogUtils.createCatalog构造PostgresCatalog时,这五个参数是必需的,其中baseUrl要求是不能带有数据库名的

StringcatalogName=' mycatalog

StringdefaultDatabase=' postgres ';

Stringusername=' postgres

Stringpwd=' postgres

stringbaseURl=' JDBC : PostgreSQL ://localhost :5432/';

PostgresCatalog PostgresCatalog=(PostgresCatalog)jdbccatalogils . createcatalog(

catalogName,

默认数据库,

用户名,

pwd,

baseURl);

e>
 

访问postgres 数据库指定表名的时候完整的路径名应该是以下格式:

<catalog>.<db>.`<schema.table>`

 

其中schema默认是public,如果是使用缺省值,public是可以省略的。比如下面的查询语句:

SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;

 

如果非缺省schema,则不能被省略:

SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;

 

  • 常见操作

我们PostgresCatalog将注册到StreamTableEnvironment 的变量tEnv中,然后就可以用tEnv进行一些操作了。

 tEnv.registerCatalog(postgresCatalog.getName(), postgresCatalog);
  tEnv.useCatalog(postgresCatalog.getName());

 

  1. 列出来所有的数据库:
        System.out.println("list databases :");
  String[] databases = tEnv.listDatabases();
  Stream.of(databases).forEach(System.out::println);

 

  1. 列出来所有的table
     tEnv.useDatabase(defaultDatabase);
  System.out.println("list tables :");
  String[] tables = tEnv.listTables(); // 也可以使用  postgresCatalog.listTables(defaultDatabase);
  Stream.of(tables).forEach(System.out::println);

 

  1. 列出所有函数
        System.out.println("list functions :");
  String[] functions = tEnv.listFunctions();
  Stream.of(functions).forEach(System.out::println);

 

  1. 获取table的schema
 CatalogBaseTable catalogBaseTable = postgresCatalog.getTable(new ObjectPath(
    defaultDatabase,
    "table1"));

  TableSchema tableSchema = catalogBaseTable.getSchema();
  System.out.println("tableSchema --------------------- :");
  System.out.println(tableSchema);

 

  1. 查询表的数据
  List<Row> results = Lists.newArrayList(tEnv.sqlQuery("select * from table1")
                                             .execute()
                                             .collect());
  results.stream().forEach(System.out::println);

 

  1. 插入数据
tEnv.executeSql("insert into table1 values (3,'c')");

 

完整的代码请参考:

https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/catalog/PostgresCatalogTest.java

 

源码解析

 

AbstractJdbcCatalog

这个类主要是对jdbc catalog一些公共的操作做了抽象.目前实现了实际功能的只有一个方法:getPrimaryKey,其他方式主要是对于Catalog的一些其他实现类做了特殊处理,比如类似create table 或者 alter table是不支持的,listView只是返回一个空列表,因为我们使用jdbc catalog主要是来做一些DML操作。

 @Override
 public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
  throw new UnsupportedOperationException();
 }

 @Override
 public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
  return Collections.emptyList();
 }

 
 

PostgresCatalog

在这里面,主要是实现了一些常用的操作数据库的方法,比如getTable、listTables、listDatabases等等,其实简单的来说就是从postgres元数据库里查询出来相应的信息,然后组装成flink的相关对象,返回给调用方。以一个简单的方法listDatabases为例:

从元数据表pg_database中查询所有的tablename,然后去掉内置的数据库,也就是template0和template1,然后封装到一个list对象里,返回。

 @Override
 public List<String> listDatabases() throws CatalogException {
  List<String> pgDatabases = new ArrayList<>();

  try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {

   PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");

   ResultSet rs = ps.executeQuery();

   while (rs.next()) {
    String dbName = rs.getString(1);
    if (!builtinDatabases.contains(dbName)) {
     pgDatabases.add(rs.getString(1));
    }
   }

   return pgDatabases;
  } catch (Exception e) {
   throw new CatalogException(
    String.format("Failed listing database in catalog %s", getName()), e);
  }
 }

 

有不兼容的地方需要做一些转换,比如getTable方法,有些数据类型是不匹配的,要做一些类型的匹配,如postgres里面的serial和int4都会转成flink的int类型,具体的参考下PostgresCatalog#fromJDBCType方法。

 private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
  String pgType = metadata.getColumnTypeName(colIndex);

  int precision = metadata.getPrecision(colIndex);
  int scale = metadata.getScale(colIndex);

  switch (pgType) {
   case PG_BOOLEAN:
    return DataTypes.BOOLEAN();
   case PG_BOOLEAN_ARRAY:
    return DataTypes.ARRAY(DataTypes.BOOLEAN());
   case PG_BYTEA:
    return DataTypes.BYTES();
    .........................

看完上述内容,你们对如何理解flink 1.11 中的JDBC Catalog有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/113486.html

(0)

相关推荐

  • 怎么理解JavaScript中的语法和代码结构

    技术怎么理解JavaScript中的语法和代码结构本篇内容主要讲解“怎么理解JavaScript中的语法和代码结构”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“怎么理解Jav

    攻略 2021年11月20日
  • 孙悟空的人物特点,西游记关于孙悟空性格特点的句子

    技术孙悟空的人物特点,西游记关于孙悟空性格特点的句子1、外貌孙悟空的人物特点: 身穿金甲亮堂堂,头戴金冠光映映。手举金箍棒一根,足踏云鞋皆相称。一双怪眼似明星,两耳过肩查又硬。挺挺身才变化多,声音响亮如钟磬。尖嘴咨牙弼马

    生活 2021年10月25日
  • mysqldump怎么用

    技术mysqldump怎么用这篇文章主要介绍了mysqldump怎么用,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。 一、创建

    攻略 2021年11月3日
  • leetcode怎么删除排序链(leetcode 删除数组元素)

    技术LeetCode如何删除链表中指定的所有元素这篇文章主要为大家展示了“LeetCode如何删除链表中指定的所有元素”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“LeetC

    攻略 2021年12月15日
  • css font-style属性的作用是什么

    技术css font-style属性的作用是什么本篇内容主要讲解“css font-style属性的作用是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“css font-

    攻略 2021年11月3日
  • 女娲补天翻译,怎样正确理解“万恶淫为首”

    技术女娲补天翻译,怎样正确理解“万恶淫为首”真人真事女娲补天翻译,巳经过去了六年了!农村有买新娘现象,这就是真事,男的巳经三十多岁了,家里正为他 筹备婚事,女是越南妹,由于身份问题,不能 正式登记结婚,农村也我直接摆酒请

    生活 2021年10月22日