提交 21a08ea5 编写于 作者: M meijie 提交者: Jark Wu

[FLINK-20552][jdbc] JdbcDynamicTableSink doesn't sink buffered data on checkpoint

This closes #14387
上级 2c92ea9a
......@@ -84,6 +84,14 @@ under the License.
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- A planner dependency won't be necessary once FLIP-32 has been completed. -->
<dependency>
<groupId>org.apache.flink</groupId>
......
......@@ -21,12 +21,13 @@ package org.apache.flink.connector.jdbc.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
import org.apache.flink.connector.jdbc.internal.options.JdbcDmlOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
......@@ -84,7 +85,7 @@ public class JdbcDynamicTableSink implements DynamicTableSink {
builder.setJdbcExecutionOptions(executionOptions);
builder.setRowDataTypeInfo(rowDataTypeInformation);
builder.setFieldDataTypes(tableSchema.getFieldDataTypes());
return OutputFormatProvider.of(builder.build(), jdbcOptions.getParallelism());
return SinkFunctionProvider.of(new GenericJdbcSinkFunction<>(builder.build()), jdbcOptions.getParallelism());
}
@Override
......
......@@ -19,17 +19,32 @@
package org.apache.flink.connector.jdbc.table;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.jdbc.JdbcTestFixture;
import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.runtime.utils.TestData;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
......@@ -45,7 +60,9 @@ import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.flink.connector.jdbc.JdbcTestFixture.DERBY_EBOOKSHOP_DB;
import static org.apache.flink.connector.jdbc.internal.JdbcTableOutputFormatTest.check;
......@@ -61,6 +78,7 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
public static final String OUTPUT_TABLE2 = "dynamicSinkForAppend";
public static final String OUTPUT_TABLE3 = "dynamicSinkForBatch";
public static final String OUTPUT_TABLE4 = "REAL_TABLE";
public static final String OUTPUT_TABLE5 = "checkpointTable";
public static final String USER_TABLE = "USER_TABLE";
@Before
......@@ -89,6 +107,9 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
stat.executeUpdate("CREATE TABLE " + OUTPUT_TABLE4 + " (real_data REAL)");
stat.executeUpdate("CREATE TABLE " + OUTPUT_TABLE5 + " (" +
"id BIGINT NOT NULL DEFAULT 0)");
stat.executeUpdate("CREATE TABLE " + USER_TABLE + " (" +
"user_id VARCHAR(20) NOT NULL," +
"user_name VARCHAR(20) NOT NULL," +
......@@ -110,6 +131,7 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
stat.execute("DROP TABLE " + OUTPUT_TABLE2);
stat.execute("DROP TABLE " + OUTPUT_TABLE3);
stat.execute("DROP TABLE " + OUTPUT_TABLE4);
stat.execute("DROP TABLE " + OUTPUT_TABLE5);
stat.execute("DROP TABLE " + USER_TABLE);
}
}
......@@ -316,4 +338,39 @@ public class JdbcDynamicTableSinkITCase extends AbstractTestBase {
Row.of("user4", "Tina", "tina@gmail.com", new BigDecimal("11.30"), new BigDecimal("22.60"))
}, DB_URL, USER_TABLE, new String[]{"user_id", "user_name", "email", "balance", "balance2"});
}
@Test
public void testFlushBufferWhenCheckpoint() throws Exception {
Map<String, String> options = new HashMap<>();
options.put("connector", "jdbc");
options.put("url", DB_URL);
options.put("table-name", OUTPUT_TABLE5);
options.put("sink.buffer-flush.interval", "0");
TableSchema sinkTableSchema = TableSchema.builder()
.field("id", DataTypes.BIGINT().notNull())
.build();
DynamicTableSink tableSink = FactoryUtil.createTableSink(
null,
ObjectIdentifier.of("default", "default", "checkpoint_sink"),
new CatalogTableImpl(sinkTableSchema, options, "mock sink"),
new Configuration(),
this.getClass().getClassLoader(),
false
);
SinkRuntimeProviderContext context = new SinkRuntimeProviderContext(false);
SinkFunctionProvider sinkProvider = (SinkFunctionProvider) tableSink.getSinkRuntimeProvider(context);
GenericJdbcSinkFunction<RowData> sinkFunction = (GenericJdbcSinkFunction<RowData>) sinkProvider.createSinkFunction();
sinkFunction.setRuntimeContext(new MockStreamingRuntimeContext(true, 1, 0));
sinkFunction.open(new Configuration());
sinkFunction.invoke(GenericRowData.of(1L), SinkContextUtil.forTimestamp(1));
sinkFunction.invoke(GenericRowData.of(2L), SinkContextUtil.forTimestamp(1));
check(new Row[]{}, DB_URL, OUTPUT_TABLE5, new String[]{"id"});
sinkFunction.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1));
check(new Row[]{Row.of(1L), Row.of(2L)}, DB_URL, OUTPUT_TABLE5, new String[]{"id"});
sinkFunction.close();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册