今天,我将和你谈谈如何理解flink 1.11中的JDBC目录。很多人可能不太了解。为了让大家更好的了解,边肖为大家总结了以下内容。希望你能从这篇文章中有所收获。
00-1010 1.11.0之前,如果用户依赖Flink的源/宿读写关系数据库或读取changelog,则必须手动创建相应的模式。但是,会有问题。当数据库中的模式发生变化时,还需要手动更新相应的Flink任务,以保持类型匹配。任何不匹配都会在运行时导致错误,并使作业失败。这种操作冗余且繁琐,体验极差。
事实上,任何与Flink连接的外部系统都可能存在上述类似的问题,在1.11.0中,主要解决了与关系数据库的接口问题。提供了JDBC目录的基本接口和Postgres目录的实现,便于后续实现与其他类型的关系数据库接口。
1.11.0版以后,用户使用Flink SQL时,无需输入DDL,就可以自动获取表的模式。此外,任何模式不匹配的错误都将在编译阶段提前检查和报告,从而避免由以前的运行时错误导致的作业失败。
00-1010目前,对于jdbc目录,flink只提供postgres目录。让我们解释如何使用基于postgres目录的flink目录。
引入pomdependency
groupIdorg.postgresql/groupId
artifactIdpostgresql/artifactId
版本42 . 2 . 5/版本
/依赖性
创建新的PostgresCatalog
目前,flink通过一个静态类创建相应的jdbc目录。对于PostgresCatalog,没有公共类型构造方法。当通过JdbcCatalogUtils.createCatalog构造PostgresCatalog时,这五个参数是必需的,其中baseUrl要求是不能带有数据库名的
StringcatalogName=' mycatalog
StringdefaultDatabase=' postgres ';
Stringusername=' postgres
Stringpwd=' postgres
stringbaseURl=' JDBC : PostgreSQL ://localhost :5432/';
PostgresCatalog PostgresCatalog=(PostgresCatalog)jdbccatalogils . createcatalog(
catalogName,
默认数据库,
用户名,
pwd,
baseURl);
e>
访问postgres 数据库指定表名的时候完整的路径名应该是以下格式:
<catalog>.<db>.`<schema.table>`
其中schema默认是public,如果是使用缺省值,public是可以省略的。比如下面的查询语句:
SELECT * FROM mypg.mydb.test_table;
SELECT * FROM mydb.test_table;
SELECT * FROM test_table;
如果非缺省schema,则不能被省略:
SELECT * FROM mypg.mydb.`custom_schema.test_table2`
SELECT * FROM mydb.`custom_schema.test_table2`;
SELECT * FROM `custom_schema.test_table2`;
-
常见操作
我们PostgresCatalog将注册到StreamTableEnvironment 的变量tEnv中,然后就可以用tEnv进行一些操作了。
tEnv.registerCatalog(postgresCatalog.getName(), postgresCatalog);
tEnv.useCatalog(postgresCatalog.getName());
-
列出来所有的数据库:
System.out.println("list databases :");
String[] databases = tEnv.listDatabases();
Stream.of(databases).forEach(System.out::println);
-
列出来所有的table
tEnv.useDatabase(defaultDatabase);
System.out.println("list tables :");
String[] tables = tEnv.listTables(); // 也可以使用 postgresCatalog.listTables(defaultDatabase);
Stream.of(tables).forEach(System.out::println);
-
列出所有函数
System.out.println("list functions :");
String[] functions = tEnv.listFunctions();
Stream.of(functions).forEach(System.out::println);
-
获取table的schema
CatalogBaseTable catalogBaseTable = postgresCatalog.getTable(new ObjectPath(
defaultDatabase,
"table1"));
TableSchema tableSchema = catalogBaseTable.getSchema();
System.out.println("tableSchema --------------------- :");
System.out.println(tableSchema);
-
查询表的数据
List<Row> results = Lists.newArrayList(tEnv.sqlQuery("select * from table1")
.execute()
.collect());
results.stream().forEach(System.out::println);
-
插入数据
tEnv.executeSql("insert into table1 values (3,'c')");
完整的代码请参考:
https://github.com/zhangjun0x01/bigdata-examples/blob/master/flink/src/main/java/catalog/PostgresCatalogTest.java
源码解析
AbstractJdbcCatalog
这个类主要是对jdbc catalog一些公共的操作做了抽象.目前实现了实际功能的只有一个方法:getPrimaryKey,其他方式主要是对于Catalog的一些其他实现类做了特殊处理,比如类似create table 或者 alter table是不支持的,listView只是返回一个空列表,因为我们使用jdbc catalog主要是来做一些DML操作。
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
return Collections.emptyList();
}
PostgresCatalog
在这里面,主要是实现了一些常用的操作数据库的方法,比如getTable、listTables、listDatabases等等,其实简单的来说就是从postgres元数据库里查询出来相应的信息,然后组装成flink的相关对象,返回给调用方。以一个简单的方法listDatabases为例:
从元数据表pg_database中查询所有的tablename,然后去掉内置的数据库,也就是template0和template1,然后封装到一个list对象里,返回。
@Override
public List<String> listDatabases() throws CatalogException {
List<String> pgDatabases = new ArrayList<>();
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String dbName = rs.getString(1);
if (!builtinDatabases.contains(dbName)) {
pgDatabases.add(rs.getString(1));
}
}
return pgDatabases;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", getName()), e);
}
}
有不兼容的地方需要做一些转换,比如getTable方法,有些数据类型是不匹配的,要做一些类型的匹配,如postgres里面的serial和int4都会转成flink的int类型,具体的参考下PostgresCatalog#fromJDBCType方法。
private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
String pgType = metadata.getColumnTypeName(colIndex);
int precision = metadata.getPrecision(colIndex);
int scale = metadata.getScale(colIndex);
switch (pgType) {
case PG_BOOLEAN:
return DataTypes.BOOLEAN();
case PG_BOOLEAN_ARRAY:
return DataTypes.ARRAY(DataTypes.BOOLEAN());
case PG_BYTEA:
return DataTypes.BYTES();
.........................
看完上述内容,你们对如何理解flink 1.11 中的JDBC Catalog有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。
内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/113486.html