未验证 提交 f5e829c4 编写于 作者: 肖佳文 提交者: Jark Wu

[FLINK-19691][jdbc] Support connection.max-retry-timeout configuration for JDBC connector

This closes #14387
上级 21a08ea5
......@@ -1301,6 +1301,9 @@ CREATE TABLE MyUserTable (
'connector.username' = 'name',
'connector.password' = 'password',
-- optional: jdbc connection max-retry-timeout
'connector.connection.max-retry-timeout' = '60s',
-- **followings are scan options, optional, used when reading from a table**
-- optional: SQL query / prepared statement.
......
......@@ -1301,6 +1301,9 @@ CREATE TABLE MyUserTable (
'connector.username' = 'name',
'connector.password' = 'password',
-- optional: jdbc connection max-retry-timeout
'connector.connection.max-retry-timeout' = '60s',
-- **followings are scan options, optional, used when reading from table**
-- These options must all be specified if any of them is specified. In addition,
......
......@@ -148,6 +148,13 @@ Connector Options
<td>String</td>
<td>The JDBC password.</td>
</tr>
<tr>
<td><h5>connection.max-retry-timeout</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">60s</td>
<td>Duration</td>
<td>Maximum timeout between retries. The timeout should be in second granularity and shouldn't be smaller than 1 second.</td>
</tr>
<tr>
<td><h5>scan.partition.column</h5></td>
<td>optional</td>
......
......@@ -148,6 +148,13 @@ Connector Options
<td>String</td>
<td>The JDBC password.</td>
</tr>
<tr>
<td><h5>connection.max-retry-timeout</h5></td>
<td>optional</td>
<td style="word-wrap: break-word;">60s</td>
<td>Duration</td>
<td>Maximum timeout between retries. The timeout should be in second granularity and shouldn't be smaller than 1 second.</td>
</tr>
<tr>
<td><h5>scan.partition.column</h5></td>
<td>optional</td>
......
......@@ -35,16 +35,19 @@ public class JdbcConnectionOptions implements Serializable {
protected final String url;
protected final String driverName;
protected final int connectionCheckTimeoutSeconds;
@Nullable
protected final String username;
@Nullable
protected final String password;
protected JdbcConnectionOptions(String url, String driverName, String username, String password) {
protected JdbcConnectionOptions(String url, String driverName, String username, String password, int connectionCheckTimeoutSeconds) {
Preconditions.checkArgument(connectionCheckTimeoutSeconds > 0);
this.url = Preconditions.checkNotNull(url, "jdbc url is empty");
this.driverName = Preconditions.checkNotNull(driverName, "driver name is empty");
this.username = username;
this.password = password;
this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
}
public String getDbURL() {
......@@ -63,6 +66,10 @@ public class JdbcConnectionOptions implements Serializable {
return Optional.ofNullable(password);
}
public int getConnectionCheckTimeoutSeconds() {
return connectionCheckTimeoutSeconds;
}
/**
* Builder for {@link JdbcConnectionOptions}.
*/
......@@ -71,6 +78,7 @@ public class JdbcConnectionOptions implements Serializable {
private String driverName;
private String username;
private String password;
private int connectionCheckTimeoutSeconds = 60;
public JdbcConnectionOptionsBuilder withUrl(String url) {
this.url = url;
......@@ -92,8 +100,17 @@ public class JdbcConnectionOptions implements Serializable {
return this;
}
/**
* Set the maximum timeout between retries, default is 60 seconds.
* @param connectionCheckTimeoutSeconds the timeout seconds, shouldn't smaller than 1 second.
*/
public JdbcConnectionOptionsBuilder withConnectionCheckTimeoutSeconds(int connectionCheckTimeoutSeconds) {
this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
return this;
}
public JdbcConnectionOptions build() {
return new JdbcConnectionOptions(url, driverName, username, password);
return new JdbcConnectionOptions(url, driverName, username, password, connectionCheckTimeoutSeconds);
}
}
}
......@@ -49,7 +49,6 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static org.apache.flink.connector.jdbc.internal.options.JdbcOptions.CONNECTION_CHECK_TIMEOUT_SECONDS;
import static org.apache.flink.connector.jdbc.utils.JdbcUtils.setRecordToStatement;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -179,7 +178,7 @@ public class JdbcBatchingOutputFormat<In, JdbcIn, JdbcExec extends JdbcBatchStat
throw new IOException(e);
}
try {
if (!connection.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) {
if (!connectionProvider.isConnectionValid()){
connection = connectionProvider.reestablishConnection();
jdbcStatementExecutor.closeStatements();
jdbcStatementExecutor.prepareStatements(connection);
......
......@@ -29,4 +29,6 @@ public interface JdbcConnectionProvider {
Connection getConnection() throws Exception;
Connection reestablishConnection() throws Exception;
boolean isConnectionValid() throws Exception;
}
......@@ -85,4 +85,9 @@ public class SimpleJdbcConnectionProvider implements JdbcConnectionProvider, Ser
connection = getConnection();
return connection;
}
@Override
public boolean isConnectionValid() throws Exception {
return connection.isValid(jdbcOptions.getConnectionCheckTimeoutSeconds());
}
}
......@@ -36,15 +36,12 @@ public class JdbcOptions extends JdbcConnectionOptions {
private static final long serialVersionUID = 1L;
public static final int CONNECTION_CHECK_TIMEOUT_SECONDS = 60;
private String tableName;
private JdbcDialect dialect;
private final @Nullable Integer parallelism;
private JdbcOptions(String dbURL, String tableName, String driverName, String username,
String password, JdbcDialect dialect, Integer parallelism) {
super(dbURL, driverName, username, password);
private JdbcOptions(String dbURL, String tableName, String driverName, String username, String password, JdbcDialect dialect, Integer parallelism, int connectionCheckTimeoutSeconds) {
super(dbURL, driverName, username, password, connectionCheckTimeoutSeconds);
this.tableName = tableName;
this.dialect = dialect;
this.parallelism = parallelism;
......@@ -76,15 +73,17 @@ public class JdbcOptions extends JdbcConnectionOptions {
Objects.equals(username, options.username) &&
Objects.equals(password, options.password) &&
Objects.equals(dialect.getClass().getName(), options.dialect.getClass().getName()) &&
Objects.equals(parallelism, options.parallelism);
Objects.equals(parallelism, options.parallelism) &&
Objects.equals(connectionCheckTimeoutSeconds, options.connectionCheckTimeoutSeconds);
} else {
return false;
}
}
@Override
public int hashCode() {
return Objects.hash(url, tableName, driverName, username, password, dialect, parallelism);
return Objects.hash(url, tableName, driverName, username, password, dialect.getClass().getName(), parallelism, connectionCheckTimeoutSeconds);
}
/**
......@@ -98,6 +97,7 @@ public class JdbcOptions extends JdbcConnectionOptions {
private String password;
private JdbcDialect dialect;
private Integer parallelism;
private int connectionCheckTimeoutSeconds = 60;
/**
* required, table name.
......@@ -123,6 +123,14 @@ public class JdbcOptions extends JdbcConnectionOptions {
return this;
}
/**
* optional, connectionCheckTimeoutSeconds.
*/
public Builder setConnectionCheckTimeoutSeconds(int connectionCheckTimeoutSeconds) {
this.connectionCheckTimeoutSeconds = connectionCheckTimeoutSeconds;
return this;
}
/**
* optional, driver name, dialect has a default driver name,
* See {@link JdbcDialect#defaultDriverName}.
......@@ -170,7 +178,7 @@ public class JdbcOptions extends JdbcConnectionOptions {
});
}
return new JdbcOptions(dbURL, tableName, driverName, username, password, dialect, parallelism);
return new JdbcOptions(dbURL, tableName, driverName, username, password, dialect, parallelism, connectionCheckTimeoutSeconds);
}
}
}
......@@ -80,6 +80,11 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
.noDefaultValue()
.withDescription("the class name of the JDBC driver to use to connect to this URL. " +
"If not set, it will automatically be derived from the URL.");
public static final ConfigOption<Duration> MAX_RETRY_TIMEOUT = ConfigOptions
.key("connection.max-retry-timeout")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDescription("Maximum timeout between retries.");
// read config options
private static final ConfigOption<String> SCAN_PARTITION_COLUMN = ConfigOptions
......@@ -192,7 +197,8 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
.setDBUrl(url)
.setTableName(readableConfig.get(TABLE_NAME))
.setDialect(JdbcDialects.get(url).get())
.setParallelism(readableConfig.getOptional(FactoryUtil.SINK_PARALLELISM).orElse(null));
.setParallelism(readableConfig.getOptional(FactoryUtil.SINK_PARALLELISM).orElse(null))
.setConnectionCheckTimeoutSeconds((int) readableConfig.get(MAX_RETRY_TIMEOUT).getSeconds());
readableConfig.getOptional(DRIVER).ifPresent(builder::setDriverName);
readableConfig.getOptional(USERNAME).ifPresent(builder::setUsername);
......@@ -274,6 +280,7 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
optionalOptions.add(SINK_BUFFER_FLUSH_INTERVAL);
optionalOptions.add(SINK_MAX_RETRIES);
optionalOptions.add(FactoryUtil.SINK_PARALLELISM);
optionalOptions.add(MAX_RETRY_TIMEOUT);
return optionalOptions;
}
......@@ -324,6 +331,13 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
SINK_MAX_RETRIES.key(),
config.get(SINK_MAX_RETRIES)));
}
if (config.get(MAX_RETRY_TIMEOUT).getSeconds() <= 0) {
throw new IllegalArgumentException(String.format(
"The value of '%s' option must be in second granularity and shouldn't be smaller than 1 second, but is %s.",
MAX_RETRY_TIMEOUT.key(),
config.get(ConfigOptions.key(MAX_RETRY_TIMEOUT.key()).stringType().noDefaultValue())));
}
}
private void checkAllOrNone(ReadableConfig config, ConfigOption<?>[] configOptions) {
......
......@@ -48,7 +48,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.connector.jdbc.internal.options.JdbcOptions.CONNECTION_CHECK_TIMEOUT_SECONDS;
import static org.apache.flink.connector.jdbc.utils.JdbcUtils.getFieldFromResultSet;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -74,6 +73,7 @@ public class JdbcLookupFunction extends TableFunction<Row> {
private final String dbURL;
private final String username;
private final String password;
private final int connectionCheckTimeoutSeconds;
private final TypeInformation[] keyTypes;
private final int[] keySqlTypes;
private final String[] fieldNames;
......@@ -95,6 +95,7 @@ public class JdbcLookupFunction extends TableFunction<Row> {
this.dbURL = options.getDbURL();
this.username = options.getUsername().orElse(null);
this.password = options.getPassword().orElse(null);
this.connectionCheckTimeoutSeconds = options.getConnectionCheckTimeoutSeconds();
this.fieldNames = fieldNames;
this.fieldTypes = fieldTypes;
this.keyNames = keyNames;
......@@ -177,7 +178,7 @@ public class JdbcLookupFunction extends TableFunction<Row> {
}
try {
if (!dbConn.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) {
if (!dbConn.isValid(connectionCheckTimeoutSeconds)) {
statement.close();
dbConn.close();
establishConnectionAndStatement();
......
......@@ -50,7 +50,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.connector.jdbc.internal.options.JdbcOptions.CONNECTION_CHECK_TIMEOUT_SECONDS;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -68,6 +67,7 @@ public class JdbcRowDataLookupFunction extends TableFunction<RowData> {
private final String dbURL;
private final String username;
private final String password;
private final int connectionCheckTimeoutSeconds;
private final DataType[] keyTypes;
private final String[] keyNames;
private final long cacheMaxSize;
......@@ -97,6 +97,7 @@ public class JdbcRowDataLookupFunction extends TableFunction<RowData> {
this.username = options.getUsername().orElse(null);
this.password = options.getPassword().orElse(null);
this.keyNames = keyNames;
this.connectionCheckTimeoutSeconds = options.getConnectionCheckTimeoutSeconds();
List<String> nameList = Arrays.asList(fieldNames);
this.keyTypes = Arrays.stream(keyNames)
.map(s -> {
......@@ -175,7 +176,7 @@ public class JdbcRowDataLookupFunction extends TableFunction<RowData> {
}
try {
if (!dbConn.isValid(CONNECTION_CHECK_TIMEOUT_SECONDS)) {
if (!dbConn.isValid(connectionCheckTimeoutSeconds)) {
statement.close();
dbConn.close();
establishConnectionAndStatement();
......
......@@ -47,6 +47,7 @@ import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT;
import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_DRIVER;
import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_LOOKUP_CACHE_MAX_ROWS;
import static org.apache.flink.table.descriptors.JdbcValidator.CONNECTOR_LOOKUP_CACHE_TTL;
......@@ -95,6 +96,7 @@ public class JdbcTableSourceSinkFactory implements
properties.add(CONNECTOR_TABLE);
properties.add(CONNECTOR_USERNAME);
properties.add(CONNECTOR_PASSWORD);
properties.add(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT);
// scan options
properties.add(CONNECTOR_READ_QUERY);
......@@ -182,6 +184,8 @@ public class JdbcTableSourceSinkFactory implements
.setTableName(descriptorProperties.getString(CONNECTOR_TABLE))
.setDialect(JdbcDialects.get(url).get());
descriptorProperties.getOptionalDuration(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT).ifPresent(
s -> builder.setConnectionCheckTimeoutSeconds((int) s.getSeconds()));
descriptorProperties.getOptionalString(CONNECTOR_DRIVER).ifPresent(builder::setDriverName);
descriptorProperties.getOptionalString(CONNECTOR_USERNAME).ifPresent(builder::setUsername);
descriptorProperties.getOptionalString(CONNECTOR_PASSWORD).ifPresent(builder::setPassword);
......
......@@ -42,6 +42,7 @@ public class JdbcValidator extends ConnectorDescriptorValidator {
public static final String CONNECTOR_DRIVER = "connector.driver";
public static final String CONNECTOR_USERNAME = "connector.username";
public static final String CONNECTOR_PASSWORD = "connector.password";
public static final String CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT = "connector.connection.max-retry-timeout";
public static final String CONNECTOR_READ_QUERY = "connector.read.query";
public static final String CONNECTOR_READ_PARTITION_COLUMN = "connector.read.partition.column";
......@@ -73,6 +74,7 @@ public class JdbcValidator extends ConnectorDescriptorValidator {
properties.validateString(CONNECTOR_DRIVER, true);
properties.validateString(CONNECTOR_USERNAME, true);
properties.validateString(CONNECTOR_PASSWORD, true);
properties.validateDuration(CONNECTOR_CONNECTION_MAX_RETRY_TIMEOUT, true, 1000);
final String url = properties.getString(CONNECTOR_URL);
final Optional<JdbcDialect> dialect = JdbcDialects.get(url);
......
......@@ -63,6 +63,7 @@ public class JdbcDynamicTableFactoryTest {
properties.put("driver", "org.apache.derby.jdbc.EmbeddedDriver");
properties.put("username", "user");
properties.put("password", "pass");
properties.put("connection.max-retry-timeout", "120s");
// validation for source
DynamicTableSource actualSource = createTableSource(properties);
......@@ -72,6 +73,7 @@ public class JdbcDynamicTableFactoryTest {
.setDriverName("org.apache.derby.jdbc.EmbeddedDriver")
.setUsername("user")
.setPassword("pass")
.setConnectionCheckTimeoutSeconds(120)
.build();
JdbcLookupOptions lookupOptions = JdbcLookupOptions.builder()
.setCacheMaxSize(-1)
......@@ -339,6 +341,18 @@ public class JdbcDynamicTableFactoryTest {
"The value of 'sink.max-retries' option shouldn't be negative, but is -1.")
.isPresent());
}
// connection.max-retry-timeout shouldn't be smaller than 1 second
try {
Map<String, String> properties = getAllOptions();
properties.put("connection.max-retry-timeout", "100ms");
createTableSource(properties);
fail("exception expected");
} catch (Throwable t) {
assertTrue(ExceptionUtils.findThrowableWithMessage(t,
"The value of 'connection.max-retry-timeout' option must be in second granularity and shouldn't be smaller than 1 second, but is 100ms.")
.isPresent());
}
}
private Map<String, String> getAllOptions() {
......
......@@ -65,6 +65,7 @@ public class JdbcTableSourceSinkFactoryTest {
properties.put("connector.driver", "org.apache.derby.jdbc.EmbeddedDriver");
properties.put("connector.username", "user");
properties.put("connector.password", "pass");
properties.put("connector.connection.max-retry-timeout", "120s");
final StreamTableSource<?> actual = TableFactoryService.find(StreamTableSourceFactory.class, properties)
.createStreamTableSource(properties);
......@@ -75,6 +76,7 @@ public class JdbcTableSourceSinkFactoryTest {
.setDriverName("org.apache.derby.jdbc.EmbeddedDriver")
.setUsername("user")
.setPassword("pass")
.setConnectionCheckTimeoutSeconds(120)
.build();
final JdbcTableSource expected = JdbcTableSource.builder()
.setOptions(options)
......@@ -267,6 +269,17 @@ public class JdbcTableSourceSinkFactoryTest {
fail("exception expected");
} catch (ValidationException ignored) {
}
// connection.max-retry-timeout property is smaller than 1 second
try {
Map<String, String> properties = getBasicProperties();
properties.put("connector.connection.max-retry-timeout", "100ms");
TableFactoryService.find(StreamTableSourceFactory.class, properties)
.createStreamTableSource(properties);
fail("exception expected");
} catch (ValidationException ignored) {
}
}
private Map<String, String> getBasicProperties() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册