提交 74b8bdee 编写于 作者: B bowen.li

[FLINK-16471][jdbc] develop JDBCCatalog and PostgresCatalog

closes #11336
上级 8a27bd91
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
rootLogger.level = OFF
rootLogger.appenderRef.test.ref = TestLogger
appender.testlogger.name = TestLogger
appender.testlogger.type = CONSOLE
appender.testlogger.target = SYSTEM_ERR
appender.testlogger.layout.type = PatternLayout
appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
......@@ -35,6 +35,11 @@ under the License.
<packaging>jar</packaging>
<properties>
<postgres.version>42.2.10</postgres.version>
<otj-pg-embedded.version>0.13.3</otj-pg-embedded.version>
</properties>
<dependencies>
<!-- Table ecosystem -->
<!-- Projects depending on this project won't depend on flink-table-*. -->
......@@ -53,13 +58,17 @@ under the License.
<scope>provided</scope>
</dependency>
<!-- Postgres dependencies -->
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.14.2.0</version>
<scope>test</scope>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>${postgres.version}</version>
<scope>provided</scope>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
......@@ -89,5 +98,24 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<!-- Postgres test dependencies -->
<dependency>
<groupId>com.opentable.components</groupId>
<artifactId>otj-pg-embedded</artifactId>
<version>${otj-pg-embedded.version}</version>
<scope>test</scope>
</dependency>
<!-- Derby test dependencies -->
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<version>10.14.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.io.jdbc.catalog;
import org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.catalog.CatalogPartition;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Abstract catalog for any JDBC catalogs.
*/
public abstract class AbstractJDBCCatalog extends AbstractCatalog {
private static final Logger LOG = LoggerFactory.getLogger(AbstractJDBCCatalog.class);
protected final String username;
protected final String pwd;
protected final String baseUrl;
protected final String defaultUrl;
public AbstractJDBCCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
super(catalogName, defaultDatabase);
checkArgument(!StringUtils.isNullOrWhitespaceOnly(username));
checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd));
checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));
JDBCCatalogUtils.validateJDBCUrl(baseUrl);
this.username = username;
this.pwd = pwd;
this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
this.defaultUrl = baseUrl + defaultDatabase;
}
@Override
public void open() throws CatalogException {
// test connection, fail early if we cannot connect to database
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
} catch (SQLException e) {
throw new ValidationException(
String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
}
LOG.info("Catalog {} established connection to {}", getName(), defaultUrl);
}
@Override
public void close() throws CatalogException {
LOG.info("Catalog {} closing", getName());
}
// ------ table factory ------
public Optional<TableFactory> getTableFactory() {
return Optional.of(new JDBCTableSourceSinkFactory());
}
// ------ databases ------
@Override
public boolean databaseExists(String databaseName) throws CatalogException {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
return listDatabases().contains(databaseName);
}
@Override
public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) throws DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
// ------ tables and views ------
@Override
public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) throws TableNotExistException, TableAlreadyExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public List<String> listViews(String databaseName) throws DatabaseNotExistException, CatalogException {
return Collections.emptyList();
}
// ------ partitions ------
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) throws TableNotExistException, TableNotPartitionedException, CatalogException {
return Collections.emptyList();
}
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws TableNotExistException, TableNotPartitionedException, CatalogException {
return Collections.emptyList();
}
@Override
public List<CatalogPartitionSpec> listPartitionsByFilter(ObjectPath tablePath, List<Expression> filters) throws TableNotExistException, TableNotPartitionedException, CatalogException {
return Collections.emptyList();
}
@Override
public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws CatalogException {
throw new UnsupportedOperationException();
}
@Override
public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition partition, boolean ignoreIfExists) throws TableNotExistException, TableNotPartitionedException, PartitionSpecInvalidException, PartitionAlreadyExistsException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogPartition newPartition, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
// ------ functions ------
@Override
public List<String> listFunctions(String dbName) throws DatabaseNotExistException, CatalogException {
return Collections.emptyList();
}
@Override
public CatalogFunction getFunction(ObjectPath functionPath) throws FunctionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public boolean functionExists(ObjectPath functionPath) throws CatalogException {
throw new UnsupportedOperationException();
}
@Override
public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) throws FunctionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
// ------ stats ------
@Override
public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
return CatalogTableStatistics.UNKNOWN;
}
@Override
public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) throws TableNotExistException, CatalogException {
return CatalogColumnStatistics.UNKNOWN;
}
@Override
public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
return CatalogTableStatistics.UNKNOWN;
}
@Override
public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) throws PartitionNotExistException, CatalogException {
return CatalogColumnStatistics.UNKNOWN;
}
@Override
public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws TableNotExistException, CatalogException, TablePartitionedException {
throw new UnsupportedOperationException();
}
@Override
public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
@Override
public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) throws PartitionNotExistException, CatalogException {
throw new UnsupportedOperationException();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.io.jdbc.catalog;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* Catalogs for relational databases via JDBC.
*/
@PublicEvolving
public class JDBCCatalog extends AbstractJDBCCatalog {
private static final Logger LOG = LoggerFactory.getLogger(JDBCCatalog.class);
private final Catalog internal;
public JDBCCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
super(catalogName, defaultDatabase, username, pwd, baseUrl);
internal = JDBCCatalogUtils.createCatalog(catalogName, defaultDatabase, username, pwd, baseUrl);
}
// ------ databases -----
@Override
public List<String> listDatabases() throws CatalogException {
return internal.listDatabases();
}
@Override
public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
return internal.getDatabase(databaseName);
}
// ------ tables and views ------
@Override
public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
return internal.listTables(databaseName);
}
@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
return internal.getTable(tablePath);
}
@Override
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
try {
return databaseExists(tablePath.getDatabaseName()) &&
listTables(tablePath.getDatabaseName()).contains(tablePath.getObjectName());
} catch (DatabaseNotExistException e) {
return false;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.io.jdbc.catalog;
import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialect;
import org.apache.flink.api.java.io.jdbc.dialect.JDBCDialects;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Utils for {@link JDBCCatalog}.
*/
public class JDBCCatalogUtils {
/**
* URL has to be without database, like "jdbc:postgresql://localhost:5432/" or "jdbc:postgresql://localhost:5432"
* rather than "jdbc:postgresql://localhost:5432/db".
*/
public static void validateJDBCUrl(String url) {
String[] parts = url.trim().split("\\/+");
checkArgument(parts.length == 2);
}
/**
* Create catalog instance from given information.
*/
public static AbstractJDBCCatalog createCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
JDBCDialect dialect = JDBCDialects.get(baseUrl).get();
if (dialect instanceof JDBCDialects.PostgresDialect) {
return new PostgresCatalog(catalogName, defaultDatabase, username, pwd, baseUrl);
} else {
throw new UnsupportedOperationException(
String.format("Catalog for '%s' is not supported yet.", dialect)
);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.io.jdbc.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* Catalog for PostgreSQL.
*/
@Internal
public class PostgresCatalog extends AbstractJDBCCatalog {
private static final Logger LOG = LoggerFactory.getLogger(PostgresCatalog.class);
public static final String POSTGRES_TABLE_TYPE = "postgres";
public static final String DEFAULT_DATABASE = "postgres";
// ------ Postgres default objects that shouldn't be exposed to users ------
private static final Set<String> builtinDatabases = new HashSet<String>() {{
add("template0");
add("template1");
}};
private static final Set<String> builtinSchemas = new HashSet<String>() {{
add("pg_toast");
add("pg_temp_1");
add("pg_toast_temp_1");
add("pg_catalog");
add("information_schema");
}};
protected PostgresCatalog(String catalogName, String defaultDatabase, String username, String pwd, String baseUrl) {
super(catalogName, defaultDatabase, username, pwd, baseUrl);
}
// ------ databases ------
@Override
public List<String> listDatabases() throws CatalogException {
List<String> pgDatabases = new ArrayList<>();
try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
PreparedStatement ps = conn.prepareStatement("SELECT datname FROM pg_database;");
ResultSet rs = ps.executeQuery();
while (rs.next()) {
String dbName = rs.getString(1);
if (!builtinDatabases.contains(dbName)) {
pgDatabases.add(rs.getString(1));
}
}
return pgDatabases;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", getName()), e);
}
}
@Override
public CatalogDatabase getDatabase(String databaseName) throws DatabaseNotExistException, CatalogException {
if (listDatabases().contains(databaseName)) {
return new CatalogDatabaseImpl(Collections.emptyMap(), null);
} else {
throw new DatabaseNotExistException(getName(), databaseName);
}
}
// ------ tables ------
@Override
public List<String> listTables(String databaseName) throws DatabaseNotExistException, CatalogException {
if (!databaseExists(databaseName)) {
throw new DatabaseNotExistException(getName(), databaseName);
}
// get all schemas
try (Connection conn = DriverManager.getConnection(baseUrl + databaseName, username, pwd)) {
PreparedStatement ps = conn.prepareStatement("SELECT schema_name FROM information_schema.schemata;");
ResultSet rs = ps.executeQuery();
List<String> schemas = new ArrayList<>();
while (rs.next()) {
String pgSchema = rs.getString(1);
if (!builtinSchemas.contains(pgSchema)) {
schemas.add(pgSchema);
}
}
List<String> tables = new ArrayList<>();
for (String schema : schemas) {
PreparedStatement stmt = conn.prepareStatement(
"SELECT * \n" +
"FROM information_schema.tables \n" +
"WHERE table_type = 'BASE TABLE' \n" +
" AND table_schema = ? \n" +
"ORDER BY table_type, table_name;");
stmt.setString(1, schema);
ResultSet rstables = stmt.executeQuery();
while (rstables.next()) {
// position 1 is database name, position 2 is schema name, position 3 is table name
tables.add(schema + "." + rstables.getString(3));
}
}
return tables;
} catch (Exception e) {
throw new CatalogException(
String.format("Failed listing database in catalog %s", getName()), e);
}
}
@Override
public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException {
if (!tableExists(tablePath)) {
throw new TableNotExistException(getName(), tablePath);
}
PostgresTablePath pgPath = PostgresTablePath.fromFlinkTableName(tablePath.getObjectName());
try (Connection conn = DriverManager.getConnection(baseUrl + tablePath.getDatabaseName(), username, pwd)) {
PreparedStatement ps = conn.prepareStatement(
String.format("SELECT * FROM %s;", pgPath.getFullPath()));
ResultSetMetaData rsmd = ps.getMetaData();
String[] names = new String[rsmd.getColumnCount()];
DataType[] types = new DataType[rsmd.getColumnCount()];
for (int i = 1; i <= rsmd.getColumnCount(); i++) {
names[i - 1] = rsmd.getColumnName(i);
types[i - 1] = fromJDBCType(rsmd, i);
}
TableSchema tableSchema = new TableSchema.Builder().fields(names, types).build();
return new CatalogTableImpl(
tableSchema,
new HashMap<>(),
""
);
} catch (Exception e) {
throw new CatalogException(
String.format("Failed getting table %s", tablePath.getFullName()), e);
}
}
public static final String PG_BYTEA = "bytea";
public static final String PG_BYTEA_ARRAY = "_bytea";
public static final String PG_SMALLINT = "int2";
public static final String PG_SMALLINT_ARRAY = "_int2";
public static final String PG_INTEGER = "int4";
public static final String PG_INTEGER_ARRAY = "_int4";
public static final String PG_BIGINT = "int8";
public static final String PG_BIGINT_ARRAY = "_int8";
public static final String PG_REAL = "float4";
public static final String PG_REAL_ARRAY = "_float4";
public static final String PG_DOUBLE_PRECISION = "float8";
public static final String PG_DOUBLE_PRECISION_ARRAY = "_float8";
public static final String PG_NUMERIC = "numeric";
public static final String PG_NUMERIC_ARRAY = "_numeric";
public static final String PG_BOOLEAN = "bool";
public static final String PG_BOOLEAN_ARRAY = "_bool";
public static final String PG_TIMESTAMP = "timestamp";
public static final String PG_TIMESTAMP_ARRAY = "_timestamp";
public static final String PG_TIMESTAMPTZ = "timestamptz";
public static final String PG_TIMESTAMPTZ_ARRAY = "_timestamptz";
public static final String PG_DATE = "date";
public static final String PG_DATE_ARRAY = "_date";
public static final String PG_TIME = "time";
public static final String PG_TIME_ARRAY = "_time";
public static final String PG_TEXT = "text";
public static final String PG_TEXT_ARRAY = "_text";
public static final String PG_CHAR = "bpchar";
public static final String PG_CHAR_ARRAY = "_bpchar";
public static final String PG_CHARACTER = "character";
public static final String PG_CHARACTER_ARRAY = "_character";
public static final String PG_CHARACTER_VARYING = "varchar";
public static final String PG_CHARACTER_VARYING_ARRAY = "_varchar";
private DataType fromJDBCType(ResultSetMetaData metadata, int colIndex) throws SQLException {
String pgType = metadata.getColumnTypeName(colIndex);
int precision = metadata.getPrecision(colIndex);
switch (pgType) {
case PG_BOOLEAN:
return DataTypes.BOOLEAN();
case PG_BOOLEAN_ARRAY:
return DataTypes.ARRAY(DataTypes.BOOLEAN());
case PG_BYTEA:
return DataTypes.BYTES();
case PG_BYTEA_ARRAY:
return DataTypes.ARRAY(DataTypes.BYTES());
case PG_SMALLINT:
return DataTypes.SMALLINT();
case PG_SMALLINT_ARRAY:
return DataTypes.ARRAY(DataTypes.SMALLINT());
case PG_INTEGER:
return DataTypes.INT();
case PG_INTEGER_ARRAY:
return DataTypes.ARRAY(DataTypes.INT());
case PG_BIGINT:
return DataTypes.BIGINT();
case PG_BIGINT_ARRAY:
return DataTypes.ARRAY(DataTypes.BIGINT());
case PG_REAL:
return DataTypes.FLOAT();
case PG_REAL_ARRAY:
return DataTypes.ARRAY(DataTypes.FLOAT());
case PG_DOUBLE_PRECISION:
return DataTypes.DOUBLE();
case PG_DOUBLE_PRECISION_ARRAY:
return DataTypes.ARRAY(DataTypes.DOUBLE());
case PG_NUMERIC:
return DataTypes.DECIMAL(precision, metadata.getScale(colIndex));
case PG_NUMERIC_ARRAY:
return DataTypes.ARRAY(
DataTypes.DECIMAL(precision, metadata.getScale(colIndex)));
case PG_CHAR:
case PG_CHARACTER:
return DataTypes.CHAR(precision);
case PG_CHAR_ARRAY:
case PG_CHARACTER_ARRAY:
return DataTypes.ARRAY(DataTypes.CHAR(precision));
case PG_CHARACTER_VARYING:
return DataTypes.VARCHAR(precision);
case PG_CHARACTER_VARYING_ARRAY:
return DataTypes.ARRAY(DataTypes.VARCHAR(precision));
case PG_TEXT:
return DataTypes.STRING();
case PG_TEXT_ARRAY:
return DataTypes.ARRAY(DataTypes.STRING());
case PG_TIMESTAMP:
return DataTypes.TIMESTAMP();
case PG_TIMESTAMP_ARRAY:
return DataTypes.ARRAY(DataTypes.TIMESTAMP());
case PG_TIMESTAMPTZ:
return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE();
case PG_TIMESTAMPTZ_ARRAY:
return DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
case PG_TIME:
return DataTypes.TIME();
case PG_TIME_ARRAY:
return DataTypes.ARRAY(DataTypes.TIME());
case PG_DATE:
return DataTypes.DATE();
case PG_DATE_ARRAY:
return DataTypes.ARRAY(DataTypes.DATE());
default:
throw new UnsupportedOperationException(
String.format("Doesn't support Postgres type '%s' yet", pgType));
}
}
@Override
public boolean tableExists(ObjectPath tablePath) throws CatalogException {
List<String> tables = null;
try {
tables = listTables(tablePath.getDatabaseName());
} catch (DatabaseNotExistException e) {
return false;
}
return tables.contains(PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getFullPath());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.io.jdbc.catalog;
import org.apache.flink.util.StringUtils;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkArgument;
/**
* Table path of PostgreSQL in Flink. Can be of formats "table_name" or "schema_name.table_name".
* When it's "table_name", the schema name defaults to "public".
*/
public class PostgresTablePath {
private static final String DEFAULT_POSTGRES_SCHEMA_NAME = "public";
private final String pgSchemaName;
private final String pgTableName;
public PostgresTablePath(String pgSchemaName, String pgTableName) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(pgSchemaName));
checkArgument(!StringUtils.isNullOrWhitespaceOnly(pgTableName));
this.pgSchemaName = pgSchemaName;
this.pgTableName = pgTableName;
}
public static PostgresTablePath fromFlinkTableName(String flinkTableName) {
if (flinkTableName.contains(".")) {
String[] path = flinkTableName.split("\\.");
checkArgument(path != null && path.length == 2,
String.format("Table name '%s' is not valid. The parsed length is %d", flinkTableName, path.length));
return new PostgresTablePath(path[0], path[1]);
} else {
return new PostgresTablePath(DEFAULT_POSTGRES_SCHEMA_NAME, flinkTableName);
}
}
public static String toFlinkTableName(String schema, String table) {
return new PostgresTablePath(schema, table).getFullPath();
}
public String getFullPath() {
return String.format("%s.%s", pgSchemaName, pgTableName);
}
public String getFullPathWithQuotes() {
return String.format("`%s.%s`", pgSchemaName, pgTableName);
}
@Override
public String toString() {
return getFullPathWithQuotes();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PostgresTablePath that = (PostgresTablePath) o;
return Objects.equals(pgSchemaName, that.pgSchemaName) &&
Objects.equals(pgTableName, that.pgTableName);
}
@Override
public int hashCode() {
return Objects.hash(pgSchemaName, pgTableName);
}
}
......@@ -203,7 +203,10 @@ public final class JDBCDialects {
}
}
private static class MySQLDialect extends AbstractDialect {
/**
* MySQL dialect.
*/
public static class MySQLDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
......@@ -301,7 +304,10 @@ public final class JDBCDialects {
}
}
private static class PostgresDialect extends AbstractDialect {
/**
* Postgres dialect.
*/
public static class PostgresDialect extends AbstractDialect {
private static final long serialVersionUID = 1L;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.io.jdbc.catalog;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
/**
* Test for {@link JDBCCatalogUtils}.
*/
public class JDBCCatalogUtilsTest {
@Rule
public ExpectedException exception = ExpectedException.none();
@Test
public void testJDBCUrl() {
JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432/");
JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432");
}
@Test
public void testInvalidJDBCUrl() {
exception.expect(IllegalArgumentException.class);
JDBCCatalogUtils.validateJDBCUrl("jdbc:postgresql://localhost:5432/db");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.io.jdbc.catalog;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import com.opentable.db.postgres.junit.EmbeddedPostgresRules;
import com.opentable.db.postgres.junit.SingleInstancePostgresRule;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Test for {@link PostgresCatalog}.
*/
public class PostgresCatalogITCase {
@Rule
public ExpectedException exception = ExpectedException.none();
@ClassRule
public static SingleInstancePostgresRule pg = EmbeddedPostgresRules.singleInstance();
protected static final String TEST_USERNAME = "postgres";
protected static final String TEST_PWD = "postgres";
protected static final String TEST_DB = "test";
protected static final String TEST_SCHEMA = "test_schema";
protected static final String TABLE1 = "t1";
protected static final String TABLE2 = "t2";
protected static final String TABLE3 = "t3";
protected static String baseUrl;
protected static Catalog catalog;
public static Catalog createCatalog(String name, String defaultDb, String username, String pwd, String jdbcUrl) {
return new PostgresCatalog("mypg", PostgresCatalog.DEFAULT_DATABASE, username, pwd, jdbcUrl);
}
@BeforeClass
public static void setup() throws SQLException {
// jdbc:postgresql://localhost:50807/postgres?user=postgres
String embeddedJdbcUrl = pg.getEmbeddedPostgres().getJdbcUrl(TEST_USERNAME, TEST_PWD);
// jdbc:postgresql://localhost:50807/
baseUrl = embeddedJdbcUrl.substring(0, embeddedJdbcUrl.lastIndexOf("/") + 1);
catalog = createCatalog("mypg", PostgresCatalog.DEFAULT_DATABASE, TEST_USERNAME, TEST_PWD, baseUrl);
// create test database and schema
createDatabase(TEST_DB);
createSchema(TEST_DB, TEST_SCHEMA);
// create test tables
// table: postgres.public.user1
createTable(PostgresTablePath.fromFlinkTableName(TABLE1), getSimpleTable().pgSchemaSql);
// table: testdb.public.user2
// table: testdb.testschema.user3
// table: testdb.public.datatypes
createTable(TEST_DB, PostgresTablePath.fromFlinkTableName(TABLE2), getSimpleTable().pgSchemaSql);
createTable(TEST_DB, new PostgresTablePath(TEST_SCHEMA, TABLE3), getSimpleTable().pgSchemaSql);
createTable(TEST_DB, PostgresTablePath.fromFlinkTableName("datatypes"), getDataTypesTable().pgSchemaSql);
}
// ------ databases ------
@Test
public void testGetDb_DatabaseNotExistException() throws Exception {
exception.expect(DatabaseNotExistException.class);
exception.expectMessage("Database nonexistent does not exist in Catalog");
catalog.getDatabase("nonexistent");
}
@Test
public void testListDatabases() {
List<String> actual = catalog.listDatabases();
assertEquals(
Arrays.asList("postgres", "test"),
actual
);
}
@Test
public void testDbExists() throws Exception {
assertFalse(catalog.databaseExists("nonexistent"));
assertTrue(catalog.databaseExists(PostgresCatalog.DEFAULT_DATABASE));
}
// ------ tables ------
@Test
public void testListTables() throws DatabaseNotExistException {
List<String> actual = catalog.listTables(PostgresCatalog.DEFAULT_DATABASE);
assertEquals(Arrays.asList("public.t1"), actual);
actual = catalog.listTables(TEST_DB);
assertEquals(Arrays.asList("public.datatypes", "public.t2", "test_schema.t3"), actual);
}
@Test
public void testListTables_DatabaseNotExistException() throws DatabaseNotExistException {
exception.expect(DatabaseNotExistException.class);
catalog.listTables("postgres/nonexistschema");
}
@Test
public void testTableExists() {
assertFalse(catalog.tableExists(new ObjectPath(TEST_DB, "nonexist")));
assertTrue(catalog.tableExists(new ObjectPath(PostgresCatalog.DEFAULT_DATABASE, TABLE1)));
assertTrue(catalog.tableExists(new ObjectPath(TEST_DB, TABLE2)));
assertTrue(catalog.tableExists(new ObjectPath(TEST_DB, "test_schema.t3")));
}
@Test
public void testGetTables_TableNotExistException() throws TableNotExistException {
exception.expect(TableNotExistException.class);
catalog.getTable(new ObjectPath(TEST_DB, PostgresTablePath.toFlinkTableName(TEST_SCHEMA, "anytable")));
}
@Test
public void testGetTables_TableNotExistException_NoSchema() throws TableNotExistException {
exception.expect(TableNotExistException.class);
catalog.getTable(new ObjectPath(TEST_DB, PostgresTablePath.toFlinkTableName("nonexistschema", "anytable")));
}
@Test
public void testGetTables_TableNotExistException_NoDb() throws TableNotExistException {
exception.expect(TableNotExistException.class);
catalog.getTable(new ObjectPath("nonexistdb", PostgresTablePath.toFlinkTableName(TEST_SCHEMA, "anytable")));
}
@Test
public void testGetTable() throws org.apache.flink.table.catalog.exceptions.TableNotExistException {
// test postgres.public.user1
TableSchema schema = getSimpleTable().schema;
CatalogBaseTable table = catalog.getTable(new ObjectPath("postgres", TABLE1));
assertEquals(schema, table.getSchema());
table = catalog.getTable(new ObjectPath("postgres", "public.t1"));
assertEquals(schema, table.getSchema());
// test testdb.public.user2
table = catalog.getTable(new ObjectPath(TEST_DB, TABLE2));
assertEquals(schema, table.getSchema());
table = catalog.getTable(new ObjectPath(TEST_DB, "public.t2"));
assertEquals(schema, table.getSchema());
// test testdb.testschema.user2
table = catalog.getTable(new ObjectPath(TEST_DB, TEST_SCHEMA + ".t3"));
assertEquals(schema, table.getSchema());
}
@Test
public void testDataTypes() throws TableNotExistException {
CatalogBaseTable table = catalog.getTable(new ObjectPath(TEST_DB, "datatypes"));
assertEquals(getDataTypesTable().schema, table.getSchema());
}
private static class TestTable {
TableSchema schema;
String pgSchemaSql;
public TestTable(TableSchema schema, String pgSchemaSql) {
this.schema = schema;
this.pgSchemaSql = pgSchemaSql;
}
}
private static TestTable getSimpleTable() {
return new TestTable(
TableSchema.builder()
.field("name", DataTypes.INT())
.build(),
"name integer"
);
}
private static TestTable getDataTypesTable() {
return new TestTable(
TableSchema.builder()
.field("int", DataTypes.INT())
.field("int_arr", DataTypes.ARRAY(DataTypes.INT()))
.field("bytea", DataTypes.BYTES())
.field("bytea_arr", DataTypes.ARRAY(DataTypes.BYTES()))
.field("short", DataTypes.SMALLINT())
.field("short_arr", DataTypes.ARRAY(DataTypes.SMALLINT()))
.field("long", DataTypes.BIGINT())
.field("long_arr", DataTypes.ARRAY(DataTypes.BIGINT()))
.field("real", DataTypes.FLOAT())
.field("real_arr", DataTypes.ARRAY(DataTypes.FLOAT()))
.field("double_precision", DataTypes.DOUBLE())
.field("double_precision_arr", DataTypes.ARRAY(DataTypes.DOUBLE()))
.field("numeric", DataTypes.DECIMAL(10, 5))
.field("numeric_arr", DataTypes.ARRAY(DataTypes.DECIMAL(10, 5)))
.field("boolean", DataTypes.BOOLEAN())
.field("boolean_arr", DataTypes.ARRAY(DataTypes.BOOLEAN()))
.field("text", DataTypes.STRING())
.field("text_arr", DataTypes.ARRAY(DataTypes.STRING()))
.field("char", DataTypes.CHAR(1))
.field("char_arr", DataTypes.ARRAY(DataTypes.CHAR(1)))
.field("character", DataTypes.CHAR(3))
.field("character_arr", DataTypes.ARRAY(DataTypes.CHAR(3)))
.field("character_varying", DataTypes.VARCHAR(20))
.field("character_varying_arr", DataTypes.ARRAY(DataTypes.VARCHAR(20)))
.field("timestamp", DataTypes.TIMESTAMP())
.field("timestamp_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP()))
.field("timestamptz", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
.field("timestamptz_arr", DataTypes.ARRAY(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()))
.field("date", DataTypes.DATE())
.field("date_arr", DataTypes.ARRAY(DataTypes.DATE()))
.field("time", DataTypes.TIME())
.field("time_arr", DataTypes.ARRAY(DataTypes.TIME()))
.build(),
"int integer, " +
"int_arr integer[], " +
"bytea bytea, " +
"bytea_arr bytea[], " +
"short smallint, " +
"short_arr smallint[], " +
"long bigint, " +
"long_arr bigint[], " +
"real real, " +
"real_arr real[], " +
"double_precision double precision, " +
"double_precision_arr double precision[], " +
"numeric numeric(10, 5), " +
"numeric_arr numeric(10, 5)[], " +
"boolean boolean, " +
"boolean_arr boolean[], " +
"text text, " +
"text_arr text[], " +
"char char, " +
"char_arr char[], " +
"character character(3), " +
"character_arr character(3)[], " +
"character_varying character varying(20), " +
"character_varying_arr character varying(20)[], " +
"timestamp timestamp(6), " +
"timestamp_arr timestamp(6)[], " +
"timestamptz timestamptz, " +
"timestamptz_arr timestamptz[], " +
"date date, " +
"date_arr date[], " +
"time time(6), " +
"time_arr time(6)[]"
);
}
private static void createTable(PostgresTablePath tablePath, String tableSchemaSql) throws SQLException {
executeSQL(PostgresCatalog.DEFAULT_DATABASE, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql));
}
private static void createTable(String db, PostgresTablePath tablePath, String tableSchemaSql) throws SQLException {
executeSQL(db, String.format("CREATE TABLE %s(%s);", tablePath.getFullPath(), tableSchemaSql));
}
private static void createSchema(String db, String schema) throws SQLException {
executeSQL(db, String.format("CREATE SCHEMA %s", schema));
}
private static void createDatabase(String database) throws SQLException {
executeSQL(String.format("CREATE DATABASE %s;", database));
}
private static void executeSQL(String sql) throws SQLException {
executeSQL("", sql);
}
private static void executeSQL(String db, String sql) throws SQLException {
try (Connection conn = DriverManager.getConnection(baseUrl + db, TEST_USERNAME, TEST_PWD);
Statement statement = conn.createStatement()) {
statement.executeUpdate(sql);
} catch (SQLException e) {
throw e;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.io.jdbc.catalog;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
/**
* Test for {@link PostgresTablePath}.
*/
public class PostgresTablePathTest {
@Test
public void testFromFlinkTableName() {
assertEquals(new PostgresTablePath("public", "topic"), PostgresTablePath.fromFlinkTableName("public.topic"));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册