未验证 提交 0c36b666 编写于 作者: K Kezhu Wang 提交者: GitHub

[FLINK-20657][connectors/jdbc] Migrate jdbc InputFormat/LookupFunction to...

[FLINK-20657][connectors/jdbc] Migrate jdbc InputFormat/LookupFunction to SimpleJdbcConnectionProvider for connection establishment

This closes #14466
上级 a859e2cc
......@@ -27,6 +27,8 @@ import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplit;
......@@ -102,19 +104,15 @@ import java.util.Arrays;
@Experimental
public class JdbcInputFormat extends RichInputFormat<Row, InputSplit> implements ResultTypeQueryable<Row> {
protected static final long serialVersionUID = 1L;
protected static final long serialVersionUID = 2L;
protected static final Logger LOG = LoggerFactory.getLogger(JdbcInputFormat.class);
protected String username;
protected String password;
protected String drivername;
protected String dbURL;
protected JdbcConnectionProvider connectionProvider;
protected String queryTemplate;
protected int resultSetType;
protected int resultSetConcurrency;
protected RowTypeInfo rowTypeInfo;
protected transient Connection dbConn;
protected transient PreparedStatement statement;
protected transient ResultSet resultSet;
protected int fetchSize;
......@@ -141,18 +139,7 @@ public class JdbcInputFormat extends RichInputFormat<Row, InputSplit> implements
public void openInputFormat() {
//called once per inputFormat (on open)
try {
// Load DriverManager first to avoid deadlock between DriverManager's
// static initialization block and specific driver class's static
// initialization block.
//
// See comments in SimpleJdbcConnectionProvider for more details.
DriverManager.getDrivers();
Class.forName(drivername);
if (username == null) {
dbConn = DriverManager.getConnection(dbURL);
} else {
dbConn = DriverManager.getConnection(dbURL, username, password);
}
Connection dbConn = connectionProvider.getOrEstablishConnection();
// set autoCommit mode only if it was explicitly configured.
// keep connection default otherwise.
......@@ -184,15 +171,7 @@ public class JdbcInputFormat extends RichInputFormat<Row, InputSplit> implements
statement = null;
}
try {
if (dbConn != null) {
dbConn.close();
}
} catch (SQLException se) {
LOG.info("Inputformat couldn't be closed - " + se.getMessage());
} finally {
dbConn = null;
}
connectionProvider.closeConnection();
parameterValues = null;
}
......@@ -342,7 +321,7 @@ public class JdbcInputFormat extends RichInputFormat<Row, InputSplit> implements
@VisibleForTesting
protected Connection getDbConn() {
return dbConn;
return connectionProvider.getConnection();
}
/**
......@@ -357,10 +336,11 @@ public class JdbcInputFormat extends RichInputFormat<Row, InputSplit> implements
* Builder for {@link JdbcInputFormat}.
*/
public static class JdbcInputFormatBuilder {
private final JdbcConnectionOptions.JdbcConnectionOptionsBuilder connOptionsBuilder;
private final JdbcInputFormat format;
public JdbcInputFormatBuilder() {
this.connOptionsBuilder = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder();
this.format = new JdbcInputFormat();
//using TYPE_FORWARD_ONLY for high performance reads
this.format.resultSetType = ResultSet.TYPE_FORWARD_ONLY;
......@@ -368,22 +348,22 @@ public class JdbcInputFormat extends RichInputFormat<Row, InputSplit> implements
}
public JdbcInputFormatBuilder setUsername(String username) {
format.username = username;
connOptionsBuilder.withUsername(username);
return this;
}
public JdbcInputFormatBuilder setPassword(String password) {
format.password = password;
connOptionsBuilder.withPassword(password);
return this;
}
public JdbcInputFormatBuilder setDrivername(String drivername) {
format.drivername = drivername;
connOptionsBuilder.withDriverName(drivername);
return this;
}
public JdbcInputFormatBuilder setDBUrl(String dbURL) {
format.dbURL = dbURL;
connOptionsBuilder.withUrl(dbURL);
return this;
}
......@@ -425,23 +405,12 @@ public class JdbcInputFormat extends RichInputFormat<Row, InputSplit> implements
}
public JdbcInputFormat finish() {
if (format.username == null) {
LOG.info("Username was not supplied separately.");
}
if (format.password == null) {
LOG.info("Password was not supplied separately.");
}
if (format.dbURL == null) {
throw new IllegalArgumentException("No database URL supplied");
}
format.connectionProvider = new SimpleJdbcConnectionProvider(connOptionsBuilder.build());
if (format.queryTemplate == null) {
throw new IllegalArgumentException("No query supplied");
}
if (format.drivername == null) {
throw new IllegalArgumentException("No driver supplied");
throw new NullPointerException("No query supplied");
}
if (format.rowTypeInfo == null) {
throw new IllegalArgumentException("No " + RowTypeInfo.class.getSimpleName() + " supplied");
throw new NullPointerException("No " + RowTypeInfo.class.getSimpleName() + " supplied");
}
if (format.parameterValues == null) {
LOG.debug("No input splitting configured (data will be read with parallelism 1).");
......
......@@ -30,7 +30,6 @@ import org.slf4j.LoggerFactory;
import java.io.Flushable;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
/**
* Base jdbc outputFormat.
......@@ -42,7 +41,6 @@ public abstract class AbstractJdbcOutputFormat<T> extends RichOutputFormat<T> im
public static final long DEFAULT_FLUSH_INTERVAL_MILLS = 0L;
private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcOutputFormat.class);
protected transient Connection connection;
protected final JdbcConnectionProvider connectionProvider;
public AbstractJdbcOutputFormat(JdbcConnectionProvider connectionProvider) {
......@@ -56,31 +54,15 @@ public abstract class AbstractJdbcOutputFormat<T> extends RichOutputFormat<T> im
@Override
public void open(int taskNumber, int numTasks) throws IOException {
try {
establishConnection();
connectionProvider.getOrEstablishConnection();
} catch (Exception e) {
throw new IOException("unable to open JDBC writer", e);
}
}
protected void establishConnection() throws Exception {
connection = connectionProvider.getConnection();
}
@Override
public void close() {
closeDbConnection();
}
private void closeDbConnection() {
if (connection != null) {
try {
connection.close();
} catch (SQLException se) {
LOG.warn("JDBC connection could not be closed: " + se.getMessage());
} finally {
connection = null;
}
}
connectionProvider.closeConnection();
}
@Override
......@@ -89,6 +71,6 @@ public abstract class AbstractJdbcOutputFormat<T> extends RichOutputFormat<T> im
@VisibleForTesting
public Connection getConnection() {
return connection;
return connectionProvider.getConnection();
}
}
......@@ -41,6 +41,7 @@ import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.concurrent.Executors;
......@@ -131,7 +132,7 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStat
private JdbcExec createAndOpenStatementExecutor(StatementExecutorFactory<JdbcExec> statementExecutorFactory) throws IOException {
JdbcExec exec = statementExecutorFactory.apply(getRuntimeContext());
try {
exec.prepareStatements(connection);
exec.prepareStatements(connectionProvider.getConnection());
} catch (SQLException e) {
throw new IOException("unable to open JDBC writer", e);
}
......@@ -179,13 +180,13 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStat
}
try {
if (!connectionProvider.isConnectionValid()){
connection = connectionProvider.reestablishConnection();
jdbcStatementExecutor.closeStatements();
Connection connection = connectionProvider.reestablishConnection();
jdbcStatementExecutor.prepareStatements(connection);
}
} catch (Exception excpetion) {
LOG.error("JDBC connection is not valid, and reestablish connection failed.", excpetion);
throw new IOException("Reestablish JDBC connection failed", excpetion);
} catch (Exception exception) {
LOG.error("JDBC connection is not valid, and reestablish connection failed.", exception);
throw new IOException("Reestablish JDBC connection failed", exception);
}
try {
Thread.sleep(1000 * i);
......
......@@ -56,7 +56,7 @@ class TableJdbcUpsertOutputFormat extends JdbcBatchingOutputFormat<Tuple2<Boolea
super.open(taskNumber, numTasks);
deleteExecutor = createDeleteExecutor();
try {
deleteExecutor.prepareStatements(connection);
deleteExecutor.prepareStatements(connectionProvider.getConnection());
} catch (SQLException e) {
throw new IOException(e);
}
......
......@@ -19,16 +19,52 @@ package org.apache.flink.connector.jdbc.internal.connection;
import org.apache.flink.annotation.Internal;
import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.SQLException;
/**
* JDBC connection provider.
*/
@Internal
public interface JdbcConnectionProvider {
Connection getConnection() throws Exception;
/**
* Get existing connection.
*
* @return existing connection
*/
@Nullable
Connection getConnection();
/**
* Check whether possible existing connection is valid or not through {@link Connection#isValid(int)}.
*
* @return true if existing connection is valid
* @throws SQLException sql exception throw from {@link Connection#isValid(int)}
*/
boolean isConnectionValid() throws SQLException;
/**
* Get existing connection or establish an new one if there is none.
*
* @return existing connection or newly established connection
* @throws SQLException sql exception
* @throws ClassNotFoundException driver class not found
*/
Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException;
Connection reestablishConnection() throws Exception;
/**
* Close possible existing connection.
*/
void closeConnection();
boolean isConnectionValid() throws Exception;
/**
* Close possible existing connection and establish an new one.
*
* @return newly established connection
* @throws SQLException sql exception
* @throws ClassNotFoundException driver class not found
*/
Connection reestablishConnection() throws SQLException, ClassNotFoundException;
}
......@@ -22,6 +22,8 @@ import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.concurrent.NotThreadSafe;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.DriverManager;
......@@ -30,6 +32,7 @@ import java.sql.SQLException;
/**
* Simple JDBC connection provider.
*/
@NotThreadSafe
public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Serializable {
private static final Logger LOG = LoggerFactory.getLogger(SimpleJdbcConnectionProvider.class);
......@@ -38,7 +41,7 @@ public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Ser
private final JdbcConnectionOptions jdbcOptions;
private transient volatile Connection connection;
private transient Connection connection;
static {
// Load DriverManager first to avoid deadlock between DriverManager's
......@@ -57,37 +60,44 @@ public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Ser
}
@Override
public Connection getConnection() throws SQLException, ClassNotFoundException {
public Connection getConnection() {
return connection;
}
@Override
public boolean isConnectionValid() throws SQLException {
return connection != null && connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
}
@Override
public Connection getOrEstablishConnection() throws SQLException, ClassNotFoundException {
if (connection == null) {
synchronized (this) {
if (connection == null) {
Class.forName(jdbcOptions.getDriverName());
if (jdbcOptions.getUsername().isPresent()) {
connection = DriverManager.getConnection(jdbcOptions.getDbURL(), jdbcOptions.getUsername().get(), jdbcOptions.getPassword().orElse(null));
} else {
connection = DriverManager.getConnection(jdbcOptions.getDbURL());
}
}
Class.forName(jdbcOptions.getDriverName());
if (jdbcOptions.getUsername().isPresent()) {
connection = DriverManager.getConnection(jdbcOptions.getDbURL(), jdbcOptions.getUsername().get(), jdbcOptions.getPassword().orElse(null));
} else {
connection = DriverManager.getConnection(jdbcOptions.getDbURL());
}
}
return connection;
}
@Override
public Connection reestablishConnection() throws SQLException, ClassNotFoundException {
try {
connection.close();
} catch (SQLException e) {
LOG.info("JDBC connection close failed.", e);
} finally {
connection = null;
public void closeConnection() {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
LOG.warn("JDBC connection close failed.", e);
} finally {
connection = null;
}
}
connection = getConnection();
return connection;
}
@Override
public boolean isConnectionValid() throws Exception {
return connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
public Connection reestablishConnection() throws SQLException, ClassNotFoundException {
closeConnection();
return getOrEstablishConnection();
}
}
......@@ -21,6 +21,8 @@ package org.apache.flink.connector.jdbc.table;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl;
......@@ -38,7 +40,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
......@@ -66,14 +67,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class JdbcLookupFunction extends TableFunction<Row> {
private static final Logger LOG = LoggerFactory.getLogger(JdbcLookupFunction.class);
private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 2L;
private final String query;
private final String drivername;
private final String dbURL;
private final String username;
private final String password;
private final int connectionCheckTimeoutSeconds;
private final JdbcConnectionProvider connectionProvider;
private final TypeInformation[] keyTypes;
private final int[] keySqlTypes;
private final String[] fieldNames;
......@@ -84,18 +81,13 @@ public class JdbcLookupFunction extends TableFunction<Row> {
private final long cacheExpireMs;
private final int maxRetryTimes;
private transient Connection dbConn;
private transient PreparedStatement statement;
private transient Cache<Row, List<Row>> cache;
public JdbcLookupFunction(
JdbcOptions options, JdbcLookupOptions lookupOptions,
String[] fieldNames, TypeInformation[] fieldTypes, String[] keyNames) {
this.drivername = options.getDriverName();
this.dbURL = options.getDbURL();
this.username = options.getUsername().orElse(null);
this.password = options.getPassword().orElse(null);
this.connectionCheckTimeoutSeconds = options.getConnectionCheckTimeoutSeconds();
this.connectionProvider = new SimpleJdbcConnectionProvider(options);
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
this.keyNames = keyNames;
......@@ -178,9 +170,9 @@ public class JdbcLookupFunction extends TableFunction<Row> {
}
try {
if (!dbConn.isValid(connectionCheckTimeoutSeconds)) {
if (!connectionProvider.isConnectionValid()) {
statement.close();
dbConn.close();
connectionProvider.closeConnection();
establishConnectionAndStatement();
}
} catch (SQLException | ClassNotFoundException excpetion) {
......@@ -206,18 +198,7 @@ public class JdbcLookupFunction extends TableFunction<Row> {
}
private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException {
// Load DriverManager first to avoid deadlock between DriverManager's
// static initialization block and specific driver class's static
// initialization block.
//
// See comments in SimpleJdbcConnectionProvider for more details.
DriverManager.getDrivers();
Class.forName(drivername);
if (username == null) {
dbConn = DriverManager.getConnection(dbURL);
} else {
dbConn = DriverManager.getConnection(dbURL, username, password);
}
Connection dbConn = connectionProvider.getOrEstablishConnection();
statement = dbConn.prepareStatement(query);
}
......@@ -237,20 +218,12 @@ public class JdbcLookupFunction extends TableFunction<Row> {
}
}
if (dbConn != null) {
try {
dbConn.close();
} catch (SQLException se) {
LOG.info("JDBC connection could not be closed: " + se.getMessage());
} finally {
dbConn = null;
}
}
connectionProvider.closeConnection();
}
@VisibleForTesting
public Connection getDbConnection() {
return dbConn;
return connectionProvider.getConnection();
}
@Override
......
......@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
......@@ -57,10 +58,10 @@ import java.util.Arrays;
@Internal
public class JdbcRowDataInputFormat extends RichInputFormat<RowData, InputSplit> implements ResultTypeQueryable<RowData> {
private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 2L;
private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataInputFormat.class);
private JdbcConnectionOptions connectionOptions;
private JdbcConnectionProvider connectionProvider;
private int fetchSize;
private Boolean autoCommit;
private Object[][] parameterValues;
......@@ -70,13 +71,12 @@ public class JdbcRowDataInputFormat extends RichInputFormat<RowData, InputSplit>
private JdbcRowConverter rowConverter;
private TypeInformation<RowData> rowDataTypeInfo;
private transient Connection dbConn;
private transient PreparedStatement statement;
private transient ResultSet resultSet;
private transient boolean hasNext;
private JdbcRowDataInputFormat(
JdbcConnectionOptions connectionOptions,
JdbcConnectionProvider connectionProvider,
int fetchSize,
Boolean autoCommit,
Object[][] parameterValues,
......@@ -85,7 +85,7 @@ public class JdbcRowDataInputFormat extends RichInputFormat<RowData, InputSplit>
int resultSetConcurrency,
JdbcRowConverter rowConverter,
TypeInformation<RowData> rowDataTypeInfo) {
this.connectionOptions = connectionOptions;
this.connectionProvider = connectionProvider;
this.fetchSize = fetchSize;
this.autoCommit = autoCommit;
this.parameterValues = parameterValues;
......@@ -105,7 +105,7 @@ public class JdbcRowDataInputFormat extends RichInputFormat<RowData, InputSplit>
public void openInputFormat() {
//called once per inputFormat (on open)
try {
dbConn = new SimpleJdbcConnectionProvider(connectionOptions).getConnection();
Connection dbConn = connectionProvider.getOrEstablishConnection();
// set autoCommit mode only if it was explicitly configured.
// keep connection default otherwise.
if (autoCommit != null) {
......@@ -135,15 +135,7 @@ public class JdbcRowDataInputFormat extends RichInputFormat<RowData, InputSplit>
statement = null;
}
try {
if (dbConn != null) {
dbConn.close();
}
} catch (SQLException se) {
LOG.info("Inputformat couldn't be closed - " + se.getMessage());
} finally {
dbConn = null;
}
connectionProvider.closeConnection();
parameterValues = null;
}
......@@ -380,16 +372,16 @@ public class JdbcRowDataInputFormat extends RichInputFormat<RowData, InputSplit>
public JdbcRowDataInputFormat build() {
if (this.queryTemplate == null) {
throw new IllegalArgumentException("No query supplied");
throw new NullPointerException("No query supplied");
}
if (this.rowConverter == null) {
throw new IllegalArgumentException("No row converter supplied");
throw new NullPointerException("No row converter supplied");
}
if (this.parameterValues == null) {
LOG.debug("No input splitting configured (data will be read with parallelism 1).");
}
return new JdbcRowDataInputFormat(
connOptionsBuilder.build(),
new SimpleJdbcConnectionProvider(connOptionsBuilder.build()),
this.fetchSize,
this.autoCommit,
this.parameterValues,
......
......@@ -22,6 +22,8 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.dialect.JdbcDialects;
import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
......@@ -42,7 +44,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
......@@ -60,14 +61,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class JdbcRowDataLookupFunction extends TableFunction<RowData> {
private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataLookupFunction.class);
private static final long serialVersionUID = 1L;
private static final long serialVersionUID = 2L;
private final String query;
private final String drivername;
private final String dbURL;
private final String username;
private final String password;
private final int connectionCheckTimeoutSeconds;
private final JdbcConnectionProvider connectionProvider;
private final DataType[] keyTypes;
private final String[] keyNames;
private final long cacheMaxSize;
......@@ -77,7 +74,6 @@ public class JdbcRowDataLookupFunction extends TableFunction<RowData> {
private final JdbcRowConverter jdbcRowConverter;
private final JdbcRowConverter lookupKeyRowConverter;
private transient Connection dbConn;
private transient FieldNamedPreparedStatement statement;
private transient Cache<RowData, List<RowData>> cache;
......@@ -92,12 +88,8 @@ public class JdbcRowDataLookupFunction extends TableFunction<RowData> {
checkNotNull(fieldNames, "No fieldNames supplied.");
checkNotNull(fieldTypes, "No fieldTypes supplied.");
checkNotNull(keyNames, "No keyNames supplied.");
this.drivername = options.getDriverName();
this.dbURL = options.getDbURL();
this.username = options.getUsername().orElse(null);
this.password = options.getPassword().orElse(null);
this.connectionProvider = new SimpleJdbcConnectionProvider(options);
this.keyNames = keyNames;
this.connectionCheckTimeoutSeconds = options.getConnectionCheckTimeoutSeconds();
List<String> nameList = Arrays.asList(fieldNames);
this.keyTypes = Arrays.stream(keyNames)
.map(s -> {
......@@ -111,6 +103,7 @@ public class JdbcRowDataLookupFunction extends TableFunction<RowData> {
this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
this.query = options.getDialect().getSelectFromStatement(
options.getTableName(), fieldNames, keyNames);
String dbURL = options.getDbURL();
this.jdbcDialect = JdbcDialects.get(dbURL)
.orElseThrow(() -> new UnsupportedOperationException(String.format("Unknown dbUrl:%s", dbURL)));
this.jdbcRowConverter = jdbcDialect.getRowConverter(rowType);
......@@ -176,9 +169,9 @@ public class JdbcRowDataLookupFunction extends TableFunction<RowData> {
}
try {
if (!dbConn.isValid(connectionCheckTimeoutSeconds)) {
if (!connectionProvider.isConnectionValid()) {
statement.close();
dbConn.close();
connectionProvider.closeConnection();
establishConnectionAndStatement();
}
} catch (SQLException | ClassNotFoundException excpetion) {
......@@ -196,18 +189,7 @@ public class JdbcRowDataLookupFunction extends TableFunction<RowData> {
}
private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException {
// Load DriverManager first to avoid deadlock between DriverManager's
// static initialization block and specific driver class's static
// initialization block.
//
// See comments in SimpleJdbcConnectionProvider for more details.
DriverManager.getDrivers();
Class.forName(drivername);
if (username == null) {
dbConn = DriverManager.getConnection(dbURL);
} else {
dbConn = DriverManager.getConnection(dbURL, username, password);
}
Connection dbConn = connectionProvider.getOrEstablishConnection();
statement = FieldNamedPreparedStatement.prepareStatement(dbConn, query, keyNames);
}
......@@ -227,19 +209,11 @@ public class JdbcRowDataLookupFunction extends TableFunction<RowData> {
}
}
if (dbConn != null) {
try {
dbConn.close();
} catch (SQLException se) {
LOG.info("JDBC connection could not be closed: " + se.getMessage());
} finally {
dbConn = null;
}
}
connectionProvider.closeConnection();
}
@VisibleForTesting
public Connection getDbConnection() {
return dbConn;
return connectionProvider.getConnection();
}
}
......@@ -26,7 +26,9 @@ import org.apache.flink.types.Row;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.io.Serializable;
......@@ -48,6 +50,9 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.TestEntry;
*/
public class JdbcInputFormatTest extends JdbcDataTestBase {
@Rule
public ExpectedException thrown = ExpectedException.none();
private JdbcInputFormat jdbcInputFormat;
@After
......@@ -59,10 +64,12 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
jdbcInputFormat = null;
}
@Test(expected = IllegalArgumentException.class)
@Test
public void testUntypedRowInfo() throws IOException {
thrown.expect(NullPointerException.class);
thrown.expectMessage("No RowTypeInfo supplied");
jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat()
.setDrivername("org.apache.derby.jdbc.idontexist")
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
.setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(SELECT_ALL_BOOKS)
.finish();
......@@ -102,8 +109,10 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
jdbcInputFormat.openInputFormat();
}
@Test(expected = IllegalArgumentException.class)
public void testIncompleteConfiguration() throws IOException {
@Test
public void testNoUrl() throws IOException {
thrown.expect(NullPointerException.class);
thrown.expectMessage("jdbc url is empty");
jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat()
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
.setQuery(SELECT_ALL_BOOKS)
......@@ -111,6 +120,17 @@ public class JdbcInputFormatTest extends JdbcDataTestBase {
.finish();
}
@Test
public void testNoQuery() throws IOException {
thrown.expect(NullPointerException.class);
thrown.expectMessage("No query supplied");
jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat()
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
.setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
.setRowTypeInfo(ROW_TYPE_INFO)
.finish();
}
@Test(expected = IllegalArgumentException.class)
public void testInvalidFetchSize() {
jdbcInputFormat = JdbcInputFormat.buildJdbcInputFormat()
......
......@@ -75,7 +75,7 @@ public class SimpleJdbcConnectionProviderDriverClassConcurrentLoadingTest {
public void go() throws Exception {
startLatch.await();
JdbcConnectionProvider connectionProvider = new SimpleJdbcConnectionProvider(options);
Connection connection = connectionProvider.getConnection();
Connection connection = connectionProvider.getOrEstablishConnection();
connection.close();
}
};
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.jdbc.internal.connection;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.fakedb.FakeDBUtils;
import org.apache.flink.connector.jdbc.fakedb.driver.FakeConnection;
import org.apache.flink.connector.jdbc.fakedb.driver.FakeConnection1;
import org.apache.flink.connector.jdbc.fakedb.driver.FakeConnection2;
import org.junit.Ignore;
import org.junit.Test;
import java.sql.Connection;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
/**
* Test for {@link SimpleJdbcConnectionProvider}.
*/
public class SimpleJdbcConnectionProviderTest {
private static JdbcConnectionProvider newFakeConnectionProviderWithDriverName(String driverName) {
JdbcConnectionOptions options = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(FakeDBUtils.TEST_DB_URL)
.withDriverName(driverName)
.build();
return new SimpleJdbcConnectionProvider(options);
}
private static JdbcConnectionProvider newFakeConnectionProvider() {
return newFakeConnectionProviderWithDriverName(FakeDBUtils.DRIVER1_CLASS_NAME);
}
@Test
public void testEstablishConnection() throws Exception {
JdbcConnectionProvider provider = newFakeConnectionProvider();
assertNull(provider.getConnection());
assertFalse(provider.isConnectionValid());
Connection connection = provider.getOrEstablishConnection();
assertNotNull(connection);
assertFalse(connection.isClosed());
assertTrue(provider.isConnectionValid());
assertThat(connection, instanceOf(FakeConnection.class));
assertNotNull(provider.getConnection());
assertSame(connection, provider.getConnection());
assertSame(connection, provider.getOrEstablishConnection());
}
@Test
@Ignore("FLINK-20658")
public void testEstablishDriverConnection() throws Exception {
JdbcConnectionProvider provider1 = newFakeConnectionProviderWithDriverName(FakeDBUtils.DRIVER1_CLASS_NAME);
Connection connection1 = provider1.getOrEstablishConnection();
assertThat(connection1, instanceOf(FakeConnection1.class));
JdbcConnectionProvider provider2 = newFakeConnectionProviderWithDriverName(FakeDBUtils.DRIVER2_CLASS_NAME);
Connection connection2 = provider2.getOrEstablishConnection();
assertThat(connection2, instanceOf(FakeConnection2.class));
}
@Test
public void testCloseNullConnection() throws Exception {
JdbcConnectionProvider provider = newFakeConnectionProvider();
provider.closeConnection();
assertNull(provider.getConnection());
assertFalse(provider.isConnectionValid());
}
@Test
public void testCloseConnection() throws Exception {
JdbcConnectionProvider provider = newFakeConnectionProvider();
Connection connection1 = provider.getOrEstablishConnection();
provider.closeConnection();
assertNull(provider.getConnection());
assertFalse(provider.isConnectionValid());
assertTrue(connection1.isClosed());
Connection connection2 = provider.getOrEstablishConnection();
assertNotSame(connection1, connection2);
assertFalse(connection2.isClosed());
connection2.close();
assertNotNull(provider.getConnection());
assertFalse(provider.isConnectionValid());
}
@Test
public void testReestablishCachedConnection() throws Exception {
JdbcConnectionProvider provider = newFakeConnectionProvider();
Connection connection1 = provider.reestablishConnection();
assertNotNull(connection1);
assertFalse(connection1.isClosed());
assertSame(connection1, provider.getConnection());
assertSame(connection1, provider.getOrEstablishConnection());
Connection connection2 = provider.reestablishConnection();
assertNotNull(connection2);
assertFalse(connection2.isClosed());
assertSame(connection2, provider.getConnection());
assertSame(connection2, provider.getOrEstablishConnection());
assertTrue(connection1.isClosed());
assertNotSame(connection1, connection2);
}
}
......@@ -36,7 +36,9 @@ import org.apache.flink.table.types.logical.RowType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.io.Serializable;
......@@ -56,6 +58,9 @@ import static org.apache.flink.connector.jdbc.JdbcTestFixture.TEST_DATA;
*/
public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
@Rule
public ExpectedException thrown = ExpectedException.none();
private JdbcRowDataInputFormat inputFormat;
private static String[] fieldNames = new String[]{"id", "title", "author", "price", "qty"};
private static DataType[] fieldDataTypes = new DataType[]{
......@@ -84,10 +89,12 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
inputFormat = null;
}
@Test(expected = IllegalArgumentException.class)
public void testUntypedRowInfo() throws IOException {
@Test
public void testNoRowConverter() throws IOException {
thrown.expect(NullPointerException.class);
thrown.expectMessage("No row converter supplied");
inputFormat = JdbcRowDataInputFormat.builder()
.setDrivername("org.apache.derby.jdbc.idontexist")
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
.setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(SELECT_ALL_BOOKS)
.build();
......@@ -100,6 +107,7 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
.setDrivername("org.apache.derby.jdbc.idontexist")
.setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
.setQuery(SELECT_ALL_BOOKS)
.setRowConverter(dialect.getRowConverter(rowType))
.build();
inputFormat.openInputFormat();
}
......@@ -110,6 +118,7 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
.setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
.setQuery(SELECT_ALL_BOOKS)
.setRowConverter(dialect.getRowConverter(rowType))
.build();
inputFormat.openInputFormat();
}
......@@ -120,15 +129,30 @@ public class JdbcRowDataInputFormatTest extends JdbcDataTestBase {
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
.setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
.setQuery("iamnotsql")
.setRowConverter(dialect.getRowConverter(rowType))
.build();
inputFormat.openInputFormat();
}
@Test(expected = IllegalArgumentException.class)
public void testIncompleteConfiguration() throws IOException {
@Test
public void testNoQuery() throws IOException {
thrown.expect(NullPointerException.class);
thrown.expectMessage("No query supplied");
inputFormat = JdbcRowDataInputFormat.builder()
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
.setDBUrl(DERBY_EBOOKSHOP_DB.getUrl())
.setRowConverter(dialect.getRowConverter(rowType))
.build();
}
@Test
public void testNoUrl() throws IOException {
thrown.expect(NullPointerException.class);
thrown.expectMessage("jdbc url is empty");
inputFormat = JdbcRowDataInputFormat.builder()
.setDrivername(DERBY_EBOOKSHOP_DB.getDriverClass())
.setQuery(SELECT_ALL_BOOKS)
.setRowConverter(dialect.getRowConverter(rowType))
.build();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册