未验证 提交 ca46672f 编写于 作者: D Danny Chan 提交者: Jark Wu

[FLINK-17026][kafka] Introduce a new Kafka connect or with new property keys

This close #12150
上级 10a65ed2
......@@ -171,6 +171,23 @@ under the License.
<scope>test</scope>
</dependency>
<!-- Kafka table factory testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.connector.format.SinkFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import java.util.Optional;
import java.util.Properties;
/**
* Kafka 0.10 table sink for writing data into Kafka.
*/
@Internal
public class Kafka010DynamicSink extends KafkaDynamicSinkBase {
public Kafka010DynamicSink(
DataType consumedDataType,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
SinkFormat<SerializationSchema<RowData>> sinkFormat) {
super(
consumedDataType,
topic,
properties,
partitioner,
sinkFormat);
}
@Override
protected FlinkKafkaProducerBase<RowData> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<RowData> serializationSchema,
Optional<FlinkKafkaPartitioner<RowData>> partitioner) {
return new FlinkKafkaProducer010<>(
topic,
serializationSchema,
properties,
partitioner.orElse(null));
}
@Override
public DynamicTableSink copy() {
return new Kafka010DynamicSink(
this.consumedDataType,
this.topic,
this.properties,
this.partitioner,
this.sinkFormat);
}
@Override
public String asSummaryString() {
return "Kafka 0.10 table sink";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.connector.format.ScanFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import java.util.Map;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*/
@Internal
public class Kafka010DynamicSource extends KafkaDynamicSourceBase {
/**
* Creates a Kafka 0.10 {@link StreamTableSource}.
*
* @param outputDataType Source output data type
* @param topic Kafka topic to consume
* @param properties Properties for the Kafka consumer
* @param scanFormat Scan format for decoding records from Kafka
* @param startupMode Startup mode for the contained consumer
* @param specificStartupOffsets Specific startup offsets; only relevant when startup
* mode is {@link StartupMode#SPECIFIC_OFFSETS}
* @param startupTimestampMillis Startup timestamp for offsets; only relevant when startup
* mode is {@link StartupMode#TIMESTAMP}
*/
public Kafka010DynamicSource(
DataType outputDataType,
String topic,
Properties properties,
ScanFormat<DeserializationSchema<RowData>> scanFormat,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis) {
super(
outputDataType,
topic,
properties,
scanFormat,
startupMode,
specificStartupOffsets,
startupTimestampMillis);
}
@Override
protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
String topic,
Properties properties,
DeserializationSchema<RowData> deserializationSchema) {
return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
}
@Override
public DynamicTableSource copy() {
return new Kafka010DynamicSource(
this.outputDataType,
this.topic,
this.properties,
this.scanFormat,
this.startupMode,
this.specificStartupOffsets,
this.startupTimestampMillis);
}
@Override
public String asSummaryString() {
return "Kafka-0.10";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.connector.format.ScanFormat;
import org.apache.flink.table.connector.format.SinkFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
/**
* Factory for creating configured instances of {@link Kafka010DynamicSource}.
*/
public class Kafka010DynamicTableFactory extends KafkaDynamicTableFactoryBase {
public static final String IDENTIFIER = "kafka-0.10";
@Override
protected KafkaDynamicSourceBase createKafkaTableSource(
DataType producedDataType,
String topic,
Properties properties,
ScanFormat<DeserializationSchema<RowData>> scanFormat,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis) {
return new Kafka010DynamicSource(
producedDataType,
topic,
properties,
scanFormat,
startupMode,
specificStartupOffsets,
startupTimestampMillis);
}
@Override
protected KafkaDynamicSinkBase createKafkaTableSink(
DataType consumedDataType,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
SinkFormat<SerializationSchema<RowData>> sinkFormat) {
return new Kafka010DynamicSink(
consumedDataType,
topic,
properties,
partitioner,
sinkFormat);
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.flink.streaming.connectors.kafka.table.Kafka010DynamicTableFactory
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.connectors.kafka.Kafka010TableSink;
import org.apache.flink.streaming.connectors.kafka.Kafka010TableSource;
import org.apache.flink.streaming.connectors.kafka.Kafka010TableSourceSinkFactory;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.connector.format.ScanFormat;
import org.apache.flink.table.connector.format.SinkFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
/**
* Test for {@link Kafka010TableSource} and {@link Kafka010TableSink} created
* by {@link Kafka010TableSourceSinkFactory}.
*/
public class Kafka010DynamicTableFactoryTest extends KafkaDynamicTableFactoryTestBase {
@Override
protected String factoryIdentifier() {
return Kafka010DynamicTableFactory.IDENTIFIER;
}
@Override
protected Class<?> getExpectedConsumerClass() {
return FlinkKafkaConsumer010.class;
}
@Override
protected Class<?> getExpectedProducerClass() {
return FlinkKafkaProducer010.class;
}
@Override
protected KafkaDynamicSourceBase getExpectedScanSource(
DataType producedDataType,
String topic,
Properties properties,
ScanFormat<DeserializationSchema<RowData>> scanFormat,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestamp) {
return new Kafka010DynamicSource(
producedDataType,
topic,
properties,
scanFormat,
startupMode,
specificStartupOffsets,
startupTimestamp);
}
@Override
protected KafkaDynamicSinkBase getExpectedSink(
DataType consumedDataType,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
SinkFormat<SerializationSchema<RowData>> sinkFormat) {
return new Kafka010DynamicSink(
consumedDataType,
topic,
properties,
partitioner,
sinkFormat);
}
}
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.table.descriptors.KafkaValidator;
......@@ -25,6 +25,11 @@ import org.apache.flink.table.descriptors.KafkaValidator;
*/
public class Kafka010TableITCase extends KafkaTableTestBase {
@Override
public String factoryIdentifier() {
return Kafka010DynamicTableFactory.IDENTIFIER;
}
@Override
public String kafkaVersion() {
return KafkaValidator.CONNECTOR_VERSION_VALUE_010;
......
......@@ -179,6 +179,15 @@ under the License.
<scope>test</scope>
</dependency>
<!-- Kafka table factory testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.connector.format.SinkFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import java.util.Optional;
import java.util.Properties;
/**
* Kafka 0.11 table sink for writing data into Kafka.
*/
@Internal
public class Kafka011DynamicSink extends KafkaDynamicSinkBase {
public Kafka011DynamicSink(
DataType consumedDataType,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
SinkFormat<SerializationSchema<RowData>> sinkFormat) {
super(
consumedDataType,
topic,
properties,
partitioner,
sinkFormat);
}
@Override
protected SinkFunction<RowData> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<RowData> serializationSchema,
Optional<FlinkKafkaPartitioner<RowData>> partitioner) {
return new FlinkKafkaProducer011<>(
topic,
new KeyedSerializationSchemaWrapper<>(serializationSchema),
properties,
partitioner);
}
@Override
public DynamicTableSink copy() {
return new Kafka011DynamicSink(
this.consumedDataType,
this.topic,
this.properties,
this.partitioner,
this.sinkFormat);
}
@Override
public String asSummaryString() {
return "Kafka 0.11 table sink";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.connector.format.ScanFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import java.util.Map;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.11.
*/
@Internal
public class Kafka011DynamicSource extends KafkaDynamicSourceBase {
/**
* Creates a Kafka 0.11 {@link org.apache.flink.table.connector.source.ScanTableSource}.
*
* @param outputDataType Source output data type
* @param topic Kafka topic to consume
* @param properties Properties for the Kafka consumer
* @param scanFormat Scan format for decoding records from Kafka
* @param startupMode Startup mode for the contained consumer
* @param specificStartupOffsets Specific startup offsets; only relevant when startup
* mode is {@link StartupMode#SPECIFIC_OFFSETS}
* @param startupTimestampMillis Startup timestamp for offsets; only relevant when startup
* mode is {@link StartupMode#TIMESTAMP}
*/
public Kafka011DynamicSource(
DataType outputDataType,
String topic,
Properties properties,
ScanFormat<DeserializationSchema<RowData>> scanFormat,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis) {
super(
outputDataType,
topic,
properties,
scanFormat,
startupMode,
specificStartupOffsets,
startupTimestampMillis);
}
@Override
protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
String topic,
Properties properties,
DeserializationSchema<RowData> deserializationSchema) {
return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
}
@Override
public DynamicTableSource copy() {
return new Kafka011DynamicSource(
this.outputDataType,
this.topic,
this.properties,
this.scanFormat,
this.startupMode,
this.specificStartupOffsets,
this.startupTimestampMillis);
}
@Override
public String asSummaryString() {
return "Kafka-0.11";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.connector.format.ScanFormat;
import org.apache.flink.table.connector.format.SinkFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
/**
* Factory for creating configured instances of {@link Kafka011DynamicSource}.
*/
public class Kafka011DynamicTableFactory extends KafkaDynamicTableFactoryBase {
public static final String IDENTIFIER = "kafka-0.11";
@Override
protected KafkaDynamicSourceBase createKafkaTableSource(
DataType producedDataType,
String topic,
Properties properties,
ScanFormat<DeserializationSchema<RowData>> scanFormat,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis) {
return new Kafka011DynamicSource(
producedDataType,
topic,
properties,
scanFormat,
startupMode,
specificStartupOffsets,
startupTimestampMillis);
}
@Override
protected KafkaDynamicSinkBase createKafkaTableSink(
DataType consumedDataType,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
SinkFormat<SerializationSchema<RowData>> sinkFormat) {
return new Kafka011DynamicSink(
consumedDataType,
topic,
properties,
partitioner,
sinkFormat);
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.flink.streaming.connectors.kafka.table.Kafka011DynamicTableFactory
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.Kafka011TableSink;
import org.apache.flink.streaming.connectors.kafka.Kafka011TableSource;
import org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.connector.format.ScanFormat;
import org.apache.flink.table.connector.format.SinkFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
/**
* Test for {@link Kafka011TableSource} and {@link Kafka011TableSink} created
* by {@link Kafka011TableSourceSinkFactory}.
*/
public class Kafka011DynamicTableFactoryTest extends KafkaDynamicTableFactoryTestBase {
@Override
protected String factoryIdentifier() {
return Kafka011DynamicTableFactory.IDENTIFIER;
}
@Override
protected Class<?> getExpectedConsumerClass() {
return FlinkKafkaConsumer011.class;
}
@Override
protected Class<?> getExpectedProducerClass() {
return FlinkKafkaProducer011.class;
}
@Override
protected KafkaDynamicSourceBase getExpectedScanSource(
DataType producedDataType,
String topic,
Properties properties,
ScanFormat<DeserializationSchema<RowData>> scanFormat,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestamp) {
return new Kafka011DynamicSource(
producedDataType,
topic,
properties,
scanFormat,
startupMode,
specificStartupOffsets,
startupTimestamp);
}
@Override
protected KafkaDynamicSinkBase getExpectedSink(
DataType consumedDataType,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
SinkFormat<SerializationSchema<RowData>> sinkFormat) {
return new Kafka011DynamicSink(
consumedDataType,
topic,
properties,
partitioner,
sinkFormat);
}
}
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.table.descriptors.KafkaValidator;
......@@ -25,6 +25,11 @@ import org.apache.flink.table.descriptors.KafkaValidator;
*/
public class Kafka011TableITCase extends KafkaTableTestBase {
@Override
public String factoryIdentifier() {
return Kafka011DynamicTableFactory.IDENTIFIER;
}
@Override
public String kafkaVersion() {
return KafkaValidator.CONNECTOR_VERSION_VALUE_011;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.SinkFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
/**
* A version-agnostic Kafka {@link DynamicTableSink}.
*
* <p>The version-specific Kafka consumers need to extend this class and
* override {@link #createKafkaProducer(String, Properties, SerializationSchema, Optional)}}.
*/
@Internal
public abstract class KafkaDynamicSinkBase implements DynamicTableSink {
/** Consumed data type of the table. */
protected final DataType consumedDataType;
/** The Kafka topic to write to. */
protected final String topic;
/** Properties for the Kafka producer. */
protected final Properties properties;
/** Sink format for encoding records to Kafka. */
protected final SinkFormat<SerializationSchema<RowData>> sinkFormat;
/** Partitioner to select Kafka partition for each item. */
protected final Optional<FlinkKafkaPartitioner<RowData>> partitioner;
protected KafkaDynamicSinkBase(
DataType consumedDataType,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
SinkFormat<SerializationSchema<RowData>> sinkFormat) {
this.consumedDataType = Preconditions.checkNotNull(consumedDataType, "Consumed data type must not be null.");
this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
this.partitioner = Preconditions.checkNotNull(partitioner, "Partitioner must not be null.");
this.sinkFormat = Preconditions.checkNotNull(sinkFormat, "Sink format must not be null.");
}
@Override
public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
return this.sinkFormat.getChangelogMode();
}
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
SerializationSchema<RowData> serializationSchema =
this.sinkFormat.createSinkFormat(context, this.consumedDataType);
final SinkFunction<RowData> kafkaProducer = createKafkaProducer(
this.topic,
properties,
serializationSchema,
this.partitioner);
return SinkFunctionProvider.of(kafkaProducer);
}
/**
* Returns the version-specific Kafka producer.
*
* @param topic Kafka topic to produce to.
* @param properties Properties for the Kafka producer.
* @param serializationSchema Serialization schema to use to create Kafka records.
* @param partitioner Partitioner to select Kafka partition.
* @return The version-specific Kafka producer
*/
protected abstract SinkFunction<RowData> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<RowData> serializationSchema,
Optional<FlinkKafkaPartitioner<RowData>> partitioner);
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final KafkaDynamicSinkBase that = (KafkaDynamicSinkBase) o;
return Objects.equals(consumedDataType, that.consumedDataType) &&
Objects.equals(topic, that.topic) &&
Objects.equals(properties, that.properties) &&
Objects.equals(sinkFormat, that.sinkFormat) &&
Objects.equals(partitioner, that.partitioner);
}
@Override
public int hashCode() {
return Objects.hash(
consumedDataType,
topic,
properties,
sinkFormat,
partitioner);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.ScanFormat;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.Preconditions;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
/**
* A version-agnostic Kafka {@link ScanTableSource}.
*
* <p>The version-specific Kafka consumers need to extend this class and
* override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
*/
@Internal
public abstract class KafkaDynamicSourceBase implements ScanTableSource {
// --------------------------------------------------------------------------------------------
// Common attributes
// --------------------------------------------------------------------------------------------
protected final DataType outputDataType;
// --------------------------------------------------------------------------------------------
// Scan format attributes
// --------------------------------------------------------------------------------------------
/** Scan format for decoding records from Kafka. */
protected final ScanFormat<DeserializationSchema<RowData>> scanFormat;
// --------------------------------------------------------------------------------------------
// Kafka-specific attributes
// --------------------------------------------------------------------------------------------
/** The Kafka topic to consume. */
protected final String topic;
/** Properties for the Kafka consumer. */
protected final Properties properties;
/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
protected final StartupMode startupMode;
/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
protected final Map<KafkaTopicPartition, Long> specificStartupOffsets;
/** The start timestamp to locate partition offsets; only relevant when startup mode is {@link StartupMode#TIMESTAMP}.*/
protected final long startupTimestampMillis;
/** The default value when startup timestamp is not used.*/
private static final long DEFAULT_STARTUP_TIMESTAMP_MILLIS = 0L;
/**
* Creates a generic Kafka {@link StreamTableSource}.
*
* @param outputDataType Source produced data type
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param scanFormat Scan format for decoding records from Kafka.
* @param startupMode Startup mode for the contained consumer.
* @param specificStartupOffsets Specific startup offsets; only relevant when startup
* mode is {@link StartupMode#SPECIFIC_OFFSETS}.
* @param startupTimestampMillis Startup timestamp for offsets; only relevant when startup
* mode is {@link StartupMode#TIMESTAMP}.
*/
protected KafkaDynamicSourceBase(
DataType outputDataType,
String topic,
Properties properties,
ScanFormat<DeserializationSchema<RowData>> scanFormat,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis) {
this.outputDataType = Preconditions.checkNotNull(
outputDataType, "Produced data type must not be null.");
this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
this.scanFormat = Preconditions.checkNotNull(
scanFormat, "Scan format must not be null.");
this.startupMode = Preconditions.checkNotNull(startupMode, "Startup mode must not be null.");
this.specificStartupOffsets = Preconditions.checkNotNull(
specificStartupOffsets, "Specific offsets must not be null.");
this.startupTimestampMillis = startupTimestampMillis;
}
@Override
public ChangelogMode getChangelogMode() {
return this.scanFormat.getChangelogMode();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(Context runtimeProviderContext) {
DeserializationSchema<RowData> deserializationSchema =
this.scanFormat.createScanFormat(runtimeProviderContext, this.outputDataType);
// Version-specific Kafka consumer
FlinkKafkaConsumerBase<RowData> kafkaConsumer =
getKafkaConsumer(topic, properties, deserializationSchema);
return SourceFunctionProvider.of(kafkaConsumer, false);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final KafkaDynamicSourceBase that = (KafkaDynamicSourceBase) o;
return Objects.equals(outputDataType, that.outputDataType) &&
Objects.equals(topic, that.topic) &&
Objects.equals(properties, that.properties) &&
Objects.equals(scanFormat, that.scanFormat) &&
startupMode == that.startupMode &&
Objects.equals(specificStartupOffsets, that.specificStartupOffsets) &&
startupTimestampMillis == that.startupTimestampMillis;
}
@Override
public int hashCode() {
return Objects.hash(
outputDataType,
topic,
properties,
scanFormat,
startupMode,
specificStartupOffsets,
startupTimestampMillis);
}
// --------------------------------------------------------------------------------------------
// Abstract methods for subclasses
// --------------------------------------------------------------------------------------------
/**
* Creates a version-specific Kafka consumer.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @return The version-specific Kafka consumer
*/
protected abstract FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
String topic,
Properties properties,
DeserializationSchema<RowData> deserializationSchema);
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
/**
* Returns a version-specific Kafka consumer with the start position configured.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param deserializationSchema Deserialization schema to use for Kafka records.
* @return The version-specific Kafka consumer
*/
protected FlinkKafkaConsumerBase<RowData> getKafkaConsumer(
String topic,
Properties properties,
DeserializationSchema<RowData> deserializationSchema) {
FlinkKafkaConsumerBase<RowData> kafkaConsumer =
createKafkaConsumer(topic, properties, deserializationSchema);
switch (startupMode) {
case EARLIEST:
kafkaConsumer.setStartFromEarliest();
break;
case LATEST:
kafkaConsumer.setStartFromLatest();
break;
case GROUP_OFFSETS:
kafkaConsumer.setStartFromGroupOffsets();
break;
case SPECIFIC_OFFSETS:
kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
break;
case TIMESTAMP:
kafkaConsumer.setStartFromTimestamp(startupTimestampMillis);
break;
}
return kafkaConsumer;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.connector.format.ScanFormat;
import org.apache.flink.table.connector.format.SinkFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.SerializationFormatFactory;
import org.apache.flink.table.types.DataType;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_BOOTSTRAP_SERVERS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.PROPS_GROUP_ID;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_SPECIFIC_OFFSETS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SCAN_STARTUP_TIMESTAMP_MILLIS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.SINK_PARTITIONER;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.TOPIC;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getFlinkKafkaPartitioner;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getKafkaProperties;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.getStartupOptions;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaOptions.validateTableOptions;
/**
* Factory for creating configured instances of
* {@link KafkaDynamicSourceBase} and {@link KafkaDynamicSinkBase}.
*/
public abstract class KafkaDynamicTableFactoryBase implements
DynamicTableSourceFactory,
DynamicTableSinkFactory {
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
ReadableConfig tableOptions = helper.getOptions();
String topic = tableOptions.get(TOPIC);
ScanFormat<DeserializationSchema<RowData>> scanFormat = helper.discoverScanFormat(
DeserializationFormatFactory.class,
FactoryUtil.FORMAT);
// Validate the option data type.
helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
// Validate the option values.
validateTableOptions(tableOptions);
DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
final KafkaOptions.StartupOptions startupOptions = getStartupOptions(tableOptions, topic);
return createKafkaTableSource(
producedDataType,
topic,
getKafkaProperties(context.getCatalogTable().getOptions()),
scanFormat,
startupOptions.startupMode,
startupOptions.specificOffsets,
startupOptions.startupTimestampMillis);
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
ReadableConfig tableOptions = helper.getOptions();
String topic = tableOptions.get(TOPIC);
SinkFormat<SerializationSchema<RowData>> sinkFormat = helper.discoverSinkFormat(
SerializationFormatFactory.class,
FactoryUtil.FORMAT);
// Validate the option data type.
helper.validateExcept(KafkaOptions.PROPERTIES_PREFIX);
// Validate the option values.
validateTableOptions(tableOptions);
DataType consumedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
return createKafkaTableSink(
consumedDataType,
topic,
getKafkaProperties(context.getCatalogTable().getOptions()),
getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()),
sinkFormat);
}
/**
* Constructs the version-specific Kafka table source.
*
* @param producedDataType Source produced data type
* @param topic Kafka topic to consume
* @param properties Properties for the Kafka consumer
* @param scanFormat Scan format for decoding records from Kafka
* @param startupMode Startup mode for the contained consumer
* @param specificStartupOffsets Specific startup offsets; only relevant when startup
* mode is {@link StartupMode#SPECIFIC_OFFSETS}
*/
protected abstract KafkaDynamicSourceBase createKafkaTableSource(
DataType producedDataType,
String topic,
Properties properties,
ScanFormat<DeserializationSchema<RowData>> scanFormat,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis);
/**
* Constructs the version-specific Kafka table sink.
*
* @param consumedDataType Sink consumed data type
* @param topic Kafka topic to consume
* @param properties Properties for the Kafka consumer
* @param partitioner Partitioner to select Kafka partition for each item
* @param sinkFormat Sink format for encoding records to Kafka
*/
protected abstract KafkaDynamicSinkBase createKafkaTableSink(
DataType consumedDataType,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
SinkFormat<SerializationSchema<RowData>> sinkFormat);
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(TOPIC);
options.add(FactoryUtil.FORMAT);
options.add(PROPS_BOOTSTRAP_SERVERS);
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(PROPS_GROUP_ID);
options.add(SCAN_STARTUP_MODE);
options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
options.add(SINK_PARTITIONER);
return options;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
/** Option utils for Kafka table source sink. */
public class KafkaOptions {
private KafkaOptions() {}
// --------------------------------------------------------------------------------------------
// Kafka specific options
// --------------------------------------------------------------------------------------------
public static final ConfigOption<String> TOPIC = ConfigOptions
.key("topic")
.stringType()
.noDefaultValue()
.withDescription("Required topic name from which the table is read");
public static final ConfigOption<String> PROPS_BOOTSTRAP_SERVERS = ConfigOptions
.key("properties.bootstrap.servers")
.stringType()
.noDefaultValue()
.withDescription("Required Kafka server connection string");
public static final ConfigOption<String> PROPS_GROUP_ID = ConfigOptions
.key("properties.group.id")
.stringType()
.noDefaultValue()
.withDescription("Required consumer group in Kafka consumer, no need for Kafka producer");
// --------------------------------------------------------------------------------------------
// Scan specific options
// --------------------------------------------------------------------------------------------
public static final ConfigOption<String> SCAN_STARTUP_MODE = ConfigOptions
.key("scan.startup.mode")
.stringType()
.defaultValue("group-offsets")
.withDescription("Optional startup mode for Kafka consumer, valid enumerations are "
+ "\"earliest-offset\", \"latest-offset\", \"group-offsets\", \"timestamp\"\n"
+ "or \"specific-offsets\"");
public static final ConfigOption<String> SCAN_STARTUP_SPECIFIC_OFFSETS = ConfigOptions
.key("scan.startup.specific-offsets")
.stringType()
.noDefaultValue()
.withDescription("Optional offsets used in case of \"specific-offsets\" startup mode");
public static final ConfigOption<Long> SCAN_STARTUP_TIMESTAMP_MILLIS = ConfigOptions
.key("scan.startup.timestamp-millis")
.longType()
.noDefaultValue()
.withDescription("Optional timestamp used in case of \"timestamp\" startup mode");
// --------------------------------------------------------------------------------------------
// Sink specific options
// --------------------------------------------------------------------------------------------
public static final ConfigOption<String> SINK_PARTITIONER = ConfigOptions
.key("sink.partitioner")
.stringType()
.noDefaultValue()
.withDescription("Optional output partitioning from Flink's partitions\n"
+ "into Kafka's partitions valid enumerations are\n"
+ "\"fixed\": (each Flink partition ends up in at most one Kafka partition),\n"
+ "\"round-robin\": (a Flink partition is distributed to Kafka partitions round-robin)\n"
+ "\"custom class name\": (use a custom FlinkKafkaPartitioner subclass)");
// --------------------------------------------------------------------------------------------
// Option enumerations
// --------------------------------------------------------------------------------------------
// Start up offset.
public static final String SCAN_STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
public static final String SCAN_STARTUP_MODE_VALUE_LATEST = "latest-offset";
public static final String SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets";
public static final String SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
public static final String SCAN_STARTUP_MODE_VALUE_TIMESTAMP = "timestamp";
private static final Set<String> SCAN_STARTUP_MODE_ENUMS = new HashSet<>(Arrays.asList(
SCAN_STARTUP_MODE_VALUE_EARLIEST,
SCAN_STARTUP_MODE_VALUE_LATEST,
SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS,
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS,
SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
// Sink partitioner.
public static final String SINK_PARTITIONER_VALUE_FIXED = "fixed";
public static final String SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin";
private static final Set<String> SINK_PARTITIONER_ENUMS = new HashSet<>(Arrays.asList(
SINK_PARTITIONER_VALUE_FIXED,
SINK_PARTITIONER_VALUE_ROUND_ROBIN));
// Prefix for Kafka specific properties.
public static final String PROPERTIES_PREFIX = "properties.";
// Other keywords.
private static final String PARTITION = "partition";
private static final String OFFSET = "offset";
// --------------------------------------------------------------------------------------------
// Validation
// --------------------------------------------------------------------------------------------
public static void validateTableOptions(ReadableConfig tableOptions) {
validateScanStartupMode(tableOptions);
validateSinkPartitioner(tableOptions);
}
private static void validateScanStartupMode(ReadableConfig tableOptions) {
tableOptions.getOptional(SCAN_STARTUP_MODE)
.map(String::toLowerCase)
.ifPresent(mode -> {
if (!SCAN_STARTUP_MODE_ENUMS.contains(mode)) {
throw new ValidationException(
String.format("Invalid value for option '%s'. Supported values are %s, but was: %s",
SCAN_STARTUP_MODE.key(),
"[earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp]",
mode));
}
if (mode.equals(SCAN_STARTUP_MODE_VALUE_TIMESTAMP)) {
if (!tableOptions.getOptional(SCAN_STARTUP_TIMESTAMP_MILLIS).isPresent()) {
throw new ValidationException(String.format("'%s' is required in '%s' startup mode"
+ " but missing.",
SCAN_STARTUP_TIMESTAMP_MILLIS.key(),
SCAN_STARTUP_MODE_VALUE_TIMESTAMP));
}
}
if (mode.equals(SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS)) {
if (!tableOptions.getOptional(SCAN_STARTUP_SPECIFIC_OFFSETS).isPresent()) {
throw new ValidationException(String.format("'%s' is required in '%s' startup mode"
+ " but missing.",
SCAN_STARTUP_SPECIFIC_OFFSETS.key(),
SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS));
}
String specificOffsets = tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
parseSpecificOffsets(specificOffsets, SCAN_STARTUP_SPECIFIC_OFFSETS.key());
}
});
}
private static void validateSinkPartitioner(ReadableConfig tableOptions) {
tableOptions.getOptional(SINK_PARTITIONER)
.ifPresent(partitioner -> {
if (!SINK_PARTITIONER_ENUMS.contains(partitioner.toLowerCase())) {
if (partitioner.isEmpty()) {
throw new ValidationException(
String.format("Option '%s' should be a non-empty string.",
SINK_PARTITIONER.key()));
}
}
});
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
public static StartupOptions getStartupOptions(
ReadableConfig tableOptions,
String topic) {
final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
final StartupMode startupMode = tableOptions.getOptional(SCAN_STARTUP_MODE)
.map(modeString -> {
switch (modeString) {
case SCAN_STARTUP_MODE_VALUE_EARLIEST:
return StartupMode.EARLIEST;
case SCAN_STARTUP_MODE_VALUE_LATEST:
return StartupMode.LATEST;
case SCAN_STARTUP_MODE_VALUE_GROUP_OFFSETS:
return StartupMode.GROUP_OFFSETS;
case SCAN_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
buildSpecificOffsets(tableOptions, topic, specificOffsets);
return StartupMode.SPECIFIC_OFFSETS;
case SCAN_STARTUP_MODE_VALUE_TIMESTAMP:
return StartupMode.TIMESTAMP;
default:
throw new TableException("Unsupported startup mode. Validator should have checked that.");
}
}).orElse(StartupMode.GROUP_OFFSETS);
final StartupOptions options = new StartupOptions();
options.startupMode = startupMode;
options.specificOffsets = specificOffsets;
if (startupMode == StartupMode.TIMESTAMP) {
options.startupTimestampMillis = tableOptions.get(SCAN_STARTUP_TIMESTAMP_MILLIS);
}
return options;
}
private static void buildSpecificOffsets(
ReadableConfig tableOptions,
String topic,
Map<KafkaTopicPartition, Long> specificOffsets) {
String specificOffsetsStrOpt = tableOptions.get(SCAN_STARTUP_SPECIFIC_OFFSETS);
final Map<Integer, Long> offsetMap = parseSpecificOffsets(
specificOffsetsStrOpt,
SCAN_STARTUP_SPECIFIC_OFFSETS.key());
offsetMap.forEach((partition, offset) -> {
final KafkaTopicPartition topicPartition = new KafkaTopicPartition(topic, partition);
specificOffsets.put(topicPartition, offset);
});
}
public static Properties getKafkaProperties(Map<String, String> tableOptions) {
final Properties kafkaProperties = new Properties();
if (hasKafkaClientProperties(tableOptions)) {
tableOptions.keySet().stream()
.filter(key -> key.startsWith(PROPERTIES_PREFIX))
.forEach(key -> {
final String value = tableOptions.get(key);
final String subKey = key.substring((PROPERTIES_PREFIX).length());
kafkaProperties.put(subKey, value);
});
}
return kafkaProperties;
}
/**
* The partitioner can be either "fixed", "round-robin" or a customized partitioner full class name.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public static Optional<FlinkKafkaPartitioner<RowData>> getFlinkKafkaPartitioner(
ReadableConfig tableOptions,
ClassLoader classLoader) {
return tableOptions.getOptional(SINK_PARTITIONER)
.flatMap((String partitioner) -> {
switch (partitioner) {
case SINK_PARTITIONER_VALUE_FIXED:
return Optional.of(new FlinkFixedPartitioner<>());
case SINK_PARTITIONER_VALUE_ROUND_ROBIN:
return Optional.empty();
// Default fallback to full class name of the partitioner.
default:
return Optional.of(initializePartitioner(partitioner, classLoader));
}
});
}
/**
* Parses SpecificOffsets String to Map.
*
* <p>SpecificOffsets String format was given as following:
*
* <pre>
* scan.startup.specific-offsets = partition:0,offset:42;partition:1,offset:300
* </pre>
*
* @return SpecificOffsets with Map format, key is partition, and value is offset
*/
public static Map<Integer, Long> parseSpecificOffsets(
String specificOffsetsStr,
String optionKey) {
final Map<Integer, Long> offsetMap = new HashMap<>();
final String[] pairs = specificOffsetsStr.split(";");
final String validationExceptionMessage = String.format(
"Invalid properties '%s' should follow the format "
+ "'partition:0,offset:42;partition:1,offset:300', but is '%s'.",
optionKey,
specificOffsetsStr);
if (pairs.length == 0) {
throw new ValidationException(validationExceptionMessage);
}
for (String pair : pairs) {
if (null == pair || pair.length() == 0 || !pair.contains(",")) {
throw new ValidationException(validationExceptionMessage);
}
final String[] kv = pair.split(",");
if (kv.length != 2 ||
!kv[0].startsWith(PARTITION + ':') ||
!kv[1].startsWith(OFFSET + ':')) {
throw new ValidationException(validationExceptionMessage);
}
String partitionValue = kv[0].substring(kv[0].indexOf(":") + 1);
String offsetValue = kv[1].substring(kv[1].indexOf(":") + 1);
try {
final Integer partition = Integer.valueOf(partitionValue);
final Long offset = Long.valueOf(offsetValue);
offsetMap.put(partition, offset);
} catch (NumberFormatException e) {
throw new ValidationException(validationExceptionMessage, e);
}
}
return offsetMap;
}
/** Decides if the table options contains Kafka client properties that start with prefix 'properties'. */
private static boolean hasKafkaClientProperties(Map<String, String> tableOptions) {
return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
}
/**
* Returns a class value with the given class name.
*/
@SuppressWarnings("rawtypes")
private static FlinkKafkaPartitioner initializePartitioner(String name, ClassLoader classLoader) {
try {
Class<?> clazz = Class.forName(name, true, classLoader);
if (!FlinkKafkaPartitioner.class.isAssignableFrom(clazz)) {
throw new ValidationException(
String.format("Sink partitioner class '%s' should extend from the required class %s",
name,
FlinkKafkaPartitioner.class.getName()));
}
return InstantiationUtil.instantiate(name, FlinkKafkaPartitioner.class, classLoader);
} catch (ClassNotFoundException | FlinkException e) {
throw new ValidationException(
String.format("Could not find and instantiate partitioner class '%s'", name), e);
}
}
// --------------------------------------------------------------------------------------------
// Inner classes
// --------------------------------------------------------------------------------------------
/** Kafka startup options. **/
public static class StartupOptions {
public StartupMode startupMode;
public Map<KafkaTopicPartition, Long> specificOffsets;
public long startupTimestampMillis;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.format.ScanFormat;
import org.apache.flink.table.connector.format.SinkFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TestFormatFactory;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Consumer;
import static org.apache.flink.util.CoreMatchers.containsCause;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
/**
* Abstract test base for {@link KafkaDynamicTableFactoryBase}.
*/
public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {
@Rule
public ExpectedException thrown = ExpectedException.none();
private static final String TOPIC = "myTopic";
private static final int PARTITION_0 = 0;
private static final long OFFSET_0 = 100L;
private static final int PARTITION_1 = 1;
private static final long OFFSET_1 = 123L;
private static final String NAME = "name";
private static final String COUNT = "count";
private static final String TIME = "time";
private static final String WATERMARK_EXPRESSION = TIME + " - INTERVAL '5' SECOND";
private static final DataType WATERMARK_DATATYPE = DataTypes.TIMESTAMP(3);
private static final String COMPUTED_COLUMN_NAME = "computed-column";
private static final String COMPUTED_COLUMN_EXPRESSION = COUNT + " + 1.0";
private static final DataType COMPUTED_COLUMN_DATATYPE = DataTypes.DECIMAL(10, 3);
private static final Properties KAFKA_PROPERTIES = new Properties();
static {
KAFKA_PROPERTIES.setProperty("group.id", "dummy");
KAFKA_PROPERTIES.setProperty("bootstrap.servers", "dummy");
}
private static final String PROPS_SCAN_OFFSETS =
String.format("partition:%d,offset:%d;partition:%d,offset:%d",
PARTITION_0, OFFSET_0, PARTITION_1, OFFSET_1);
private static final TableSchema SOURCE_SCHEMA = TableSchema.builder()
.field(NAME, DataTypes.STRING())
.field(COUNT, DataTypes.DECIMAL(38, 18))
.field(TIME, DataTypes.TIMESTAMP(3))
.field(COMPUTED_COLUMN_NAME, COMPUTED_COLUMN_DATATYPE, COMPUTED_COLUMN_EXPRESSION)
.watermark(TIME, WATERMARK_EXPRESSION, WATERMARK_DATATYPE)
.build();
private static final TableSchema SINK_SCHEMA = TableSchema.builder()
.field(NAME, DataTypes.STRING())
.field(COUNT, DataTypes.DECIMAL(38, 18))
.field(TIME, DataTypes.TIMESTAMP(3))
.build();
@Test
@SuppressWarnings("unchecked")
public void testTableSource() {
// prepare parameters for Kafka table source
final DataType producedDataType = SOURCE_SCHEMA.toPhysicalRowDataType();
final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_0), OFFSET_0);
specificOffsets.put(new KafkaTopicPartition(TOPIC, PARTITION_1), OFFSET_1);
ScanFormat<DeserializationSchema<RowData>> scanFormat =
new TestFormatFactory.ScanFormatMock(",", true);
// Construct table source using options and table source factory
ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
"default",
"default",
"scanTable");
CatalogTable catalogTable = createKafkaSourceCatalogTable();
final DynamicTableSource actualSource = FactoryUtil.createTableSource(null,
objectIdentifier,
catalogTable,
new Configuration(),
Thread.currentThread().getContextClassLoader());
// Test scan source equals
final KafkaDynamicSourceBase expectedKafkaSource = getExpectedScanSource(
producedDataType,
TOPIC,
KAFKA_PROPERTIES,
scanFormat,
StartupMode.SPECIFIC_OFFSETS,
specificOffsets,
0);
final KafkaDynamicSourceBase actualKafkaSource = (KafkaDynamicSourceBase) actualSource;
assertEquals(actualKafkaSource, expectedKafkaSource);
// Test Kafka consumer
ScanTableSource.ScanRuntimeProvider provider =
actualKafkaSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
assertThat(provider, instanceOf(SourceFunctionProvider.class));
final SourceFunctionProvider sourceFunctionProvider = (SourceFunctionProvider) provider;
final SourceFunction<RowData> sourceFunction = sourceFunctionProvider.createSourceFunction();
assertThat(sourceFunction, instanceOf(getExpectedConsumerClass()));
}
@Test
public void testTableSink() {
final DataType consumedDataType = SINK_SCHEMA.toPhysicalRowDataType();
SinkFormat<SerializationSchema<RowData>> sinkFormat =
new TestFormatFactory.SinkFormatMock(",");
// Construct table sink using options and table sink factory.
ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
"default",
"default",
"sinkTable");
final CatalogTable sinkTable = createKafkaSinkCatalogTable();
final DynamicTableSink actualSink = FactoryUtil.createTableSink(
null,
objectIdentifier,
sinkTable,
new Configuration(),
Thread.currentThread().getContextClassLoader());
final DynamicTableSink expectedSink = getExpectedSink(
consumedDataType,
TOPIC,
KAFKA_PROPERTIES,
Optional.of(new FlinkFixedPartitioner<>()),
sinkFormat);
assertEquals(expectedSink, actualSink);
// Test sink format.
final KafkaDynamicSinkBase actualKafkaSink = (KafkaDynamicSinkBase) actualSink;
assertEquals(sinkFormat, actualKafkaSink.sinkFormat);
// Test kafka producer.
DynamicTableSink.SinkRuntimeProvider provider =
actualKafkaSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
assertThat(provider, instanceOf(SinkFunctionProvider.class));
final SinkFunctionProvider sinkFunctionProvider = (SinkFunctionProvider) provider;
final SinkFunction<RowData> sinkFunction = sinkFunctionProvider.createSinkFunction();
assertThat(sinkFunction, instanceOf(getExpectedProducerClass()));
}
// --------------------------------------------------------------------------------------------
// Negative tests
// --------------------------------------------------------------------------------------------
@Test
public void testInvalidScanStartupMode() {
// Construct table source using DDL and table source factory
ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
"default",
"default",
"scanTable");
final Map<String, String> modifiedOptions = getModifiedOptions(
getFullSourceOptions(),
options -> {
options.put("scan.startup.mode", "abc");
});
CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions);
thrown.expect(ValidationException.class);
thrown.expect(containsCause(new ValidationException("Invalid value for option 'scan.startup.mode'. "
+ "Supported values are [earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp], "
+ "but was: abc")));
FactoryUtil.createTableSource(null,
objectIdentifier,
catalogTable,
new Configuration(),
Thread.currentThread().getContextClassLoader());
}
@Test
public void testMissingStartupTimestamp() {
// Construct table source using DDL and table source factory
ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
"default",
"default",
"scanTable");
final Map<String, String> modifiedOptions = getModifiedOptions(
getFullSourceOptions(),
options -> {
options.put("scan.startup.mode", "timestamp");
});
CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions);
thrown.expect(ValidationException.class);
thrown.expect(containsCause(new ValidationException("'scan.startup.timestamp-millis' "
+ "is required in 'timestamp' startup mode but missing.")));
FactoryUtil.createTableSource(null,
objectIdentifier,
catalogTable,
new Configuration(),
Thread.currentThread().getContextClassLoader());
}
@Test
public void testMissingSpecificOffsets() {
// Construct table source using DDL and table source factory
ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
"default",
"default",
"scanTable");
final Map<String, String> modifiedOptions = getModifiedOptions(
getFullSourceOptions(),
options -> {
options.remove("scan.startup.specific-offsets");
});
CatalogTable catalogTable = createKafkaSourceCatalogTable(modifiedOptions);
thrown.expect(ValidationException.class);
thrown.expect(containsCause(new ValidationException("'scan.startup.specific-offsets' "
+ "is required in 'specific-offsets' startup mode but missing.")));
FactoryUtil.createTableSource(null,
objectIdentifier,
catalogTable,
new Configuration(),
Thread.currentThread().getContextClassLoader());
}
@Test
public void testInvalidSinkPartitioner() {
ObjectIdentifier objectIdentifier = ObjectIdentifier.of(
"default",
"default",
"sinkTable");
final Map<String, String> modifiedOptions = getModifiedOptions(
getFullSourceOptions(),
options -> {
options.put("sink.partitioner", "abc");
});
final CatalogTable sinkTable = createKafkaSinkCatalogTable(modifiedOptions);
thrown.expect(ValidationException.class);
thrown.expect(containsCause(new ValidationException("Could not find and instantiate partitioner class 'abc'")));
FactoryUtil.createTableSink(
null,
objectIdentifier,
sinkTable,
new Configuration(),
Thread.currentThread().getContextClassLoader());
}
// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
private CatalogTable createKafkaSourceCatalogTable() {
return createKafkaSourceCatalogTable(getFullSourceOptions());
}
private CatalogTable createKafkaSinkCatalogTable() {
return createKafkaSinkCatalogTable(getFullSinkOptions());
}
private CatalogTable createKafkaSourceCatalogTable(Map<String, String> options) {
return new CatalogTableImpl(SOURCE_SCHEMA, options, "scanTable");
}
private CatalogTable createKafkaSinkCatalogTable(Map<String, String> options) {
return new CatalogTableImpl(SINK_SCHEMA, options, "sinkTable");
}
/**
* Returns the full options modified by the given consumer {@code optionModifier}.
*
* @param optionModifier Consumer to modify the options
*/
private static Map<String, String> getModifiedOptions(
Map<String, String> options,
Consumer<Map<String, String>> optionModifier) {
optionModifier.accept(options);
return options;
}
private Map<String, String> getFullSourceOptions() {
Map<String, String> tableOptions = new HashMap<>();
// Kafka specific options.
tableOptions.put("connector", factoryIdentifier());
tableOptions.put("topic", TOPIC);
tableOptions.put("properties.group.id", "dummy");
tableOptions.put("properties.bootstrap.servers", "dummy");
tableOptions.put("scan.startup.mode", "specific-offsets");
tableOptions.put("scan.startup.specific-offsets", PROPS_SCAN_OFFSETS);
// Format options.
tableOptions.put("format", TestFormatFactory.IDENTIFIER);
final String formatDelimiterKey = String.format("%s.%s",
TestFormatFactory.IDENTIFIER, TestFormatFactory.DELIMITER.key());
final String failOnMissingKey = String.format("%s.%s",
TestFormatFactory.IDENTIFIER, TestFormatFactory.FAIL_ON_MISSING.key());
tableOptions.put(formatDelimiterKey, ",");
tableOptions.put(failOnMissingKey, "true");
return tableOptions;
}
private Map<String, String> getFullSinkOptions() {
Map<String, String> tableOptions = new HashMap<>();
// Kafka specific options.
tableOptions.put("connector", factoryIdentifier());
tableOptions.put("topic", TOPIC);
tableOptions.put("properties.group.id", "dummy");
tableOptions.put("properties.bootstrap.servers", "dummy");
tableOptions.put("sink.partitioner", KafkaOptions.SINK_PARTITIONER_VALUE_FIXED);
// Format options.
tableOptions.put("format", TestFormatFactory.IDENTIFIER);
final String formatDelimiterKey = String.format("%s.%s",
TestFormatFactory.IDENTIFIER, TestFormatFactory.DELIMITER.key());
tableOptions.put(formatDelimiterKey, ",");
return tableOptions;
}
// --------------------------------------------------------------------------------------------
// For version-specific tests
// --------------------------------------------------------------------------------------------
protected abstract String factoryIdentifier();
protected abstract Class<?> getExpectedConsumerClass();
protected abstract Class<?> getExpectedProducerClass();
protected abstract KafkaDynamicSourceBase getExpectedScanSource(
DataType producedDataType,
String topic,
Properties properties,
ScanFormat<DeserializationSchema<RowData>> scanFormat,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestamp
);
protected abstract KafkaDynamicSinkBase getExpectedSink(
DataType consumedDataType,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
SinkFormat<SerializationSchema<RowData>> sinkFormat
);
}
......@@ -16,48 +16,63 @@
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.runtime.utils.TableEnvUtil;
import org.apache.flink.types.Row;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
/**
* Basic Tests for Kafka connector for Table API & SQL.
*/
@RunWith(Parameterized.class)
public abstract class KafkaTableTestBase extends KafkaTestBase {
@Parameterized.Parameter
public boolean isLegacyConnector;
@Parameterized.Parameter(1)
public int topicID;
@Parameterized.Parameters(name = "legacy = {0}, topicId = {1}")
public static Object[] parameters() {
return new Object[][]{
new Object[]{true, 0},
new Object[]{false, 1}
};
}
public abstract String factoryIdentifier();
// Used for legacy planner.
public abstract String kafkaVersion();
@Test
public void testKafkaSourceSink() throws Exception {
final String topic = "tstopic";
final String topic = "tstopic" + topicID;
createTestTopic(topic, 1, 1);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance()
// watermark is only supported in blink planner
// Watermark is only supported in blink planner
.useBlinkPlanner()
.inStreamingMode()
.build()
......@@ -69,55 +84,54 @@ public abstract class KafkaTableTestBase extends KafkaTestBase {
String groupId = standardProps.getProperty("group.id");
String bootstraps = standardProps.getProperty("bootstrap.servers");
// TODO: use DDL to register Kafka once FLINK-15282 is fixed.
// we have to register into Catalog manually because it will use Calcite's ParameterScope
TableSchema schema = TableSchema.builder()
.field("computed-price", DataTypes.DECIMAL(38, 18), "price + 1.0")
.field("price", DataTypes.DECIMAL(38, 18))
.field("currency", DataTypes.STRING())
.field("log_ts", DataTypes.TIMESTAMP(3))
.field("ts", DataTypes.TIMESTAMP(3), "log_ts + INTERVAL '1' SECOND")
.watermark("ts", "ts", DataTypes.TIMESTAMP(3))
.build();
Map<String, String> properties = new HashMap<>();
properties.put("connector.type", "kafka");
properties.put("connector.topic", topic);
properties.put("connector.version", kafkaVersion());
properties.put("connector.properties.bootstrap.servers", bootstraps);
properties.put("connector.properties.group.id", groupId);
properties.put("connector.startup-mode", "earliest-offset");
properties.put("format.type", "json");
properties.put("update-mode", "append");
CatalogTableImpl catalogTable = new CatalogTableImpl(
schema,
properties,
"comment"
);
tEnv.getCatalog(tEnv.getCurrentCatalog()).get().createTable(
ObjectPath.fromString(tEnv.getCurrentDatabase() + "." + "kafka"),
catalogTable,
true);
// TODO: use the following DDL instead of the preceding code to register Kafka
// String ddl = "CREATE TABLE kafka (\n" +
// " computed-price as price + 1.0,\n" +
// " price DECIMAL(38, 18),\n" +
// " currency STRING,\n" +
// " log_ts TIMESTAMP(3),\n" +
// " ts AS log_ts + INTERVAL '1' SECOND,\n" +
// " WATERMARK FOR ts AS ts\n" +
// ") with (\n" +
// " 'connector.type' = 'kafka',\n" +
// " 'connector.topic' = '" + topic + "',\n" +
// " 'connector.version' = 'universal',\n" +
// " 'connector.properties.bootstrap.servers' = '" + bootstraps + "',\n" +
// " 'connector.properties.group.id' = '" + groupId + "', \n" +
// " 'connector.startup-mode' = 'earliest-offset', \n" +
// " 'format.type' = 'json',\n" +
// " 'update-mode' = 'append'\n" +
// ")";
// tEnv.sqlUpdate(ddl);
final String createTable;
if (!isLegacyConnector) {
createTable = String.format(
"create table kafka (\n" +
" `computed-price` as price + 1.0,\n" +
" price decimal(38, 18),\n" +
" currency string,\n" +
" log_ts timestamp(3),\n" +
" ts as log_ts + INTERVAL '1' SECOND,\n" +
" watermark for ts as ts\n" +
") with (\n" +
" 'connector' = '%s',\n" +
" 'topic' = '%s',\n" +
" 'properties.bootstrap.servers' = '%s',\n" +
" 'properties.group.id' = '%s',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'json'\n" +
")",
factoryIdentifier(),
topic,
bootstraps,
groupId);
} else {
createTable = String.format(
"create table kafka (\n" +
" `computed-price` as price + 1.0,\n" +
" price decimal(38, 18),\n" +
" currency string,\n" +
" log_ts timestamp(3),\n" +
" ts as log_ts + INTERVAL '1' SECOND,\n" +
" watermark for ts as ts\n" +
") with (\n" +
" 'connector.type' = 'kafka',\n" +
" 'connector.version' = '%s',\n" +
" 'connector.topic' = '%s',\n" +
" 'connector.properties.bootstrap.servers' = '%s',\n" +
" 'connector.properties.group.id' = '%s',\n" +
" 'connector.startup-mode' = 'earliest-offset',\n" +
" 'format.type' = 'json',\n" +
" 'update-mode' = 'append'\n" +
")",
kafkaVersion(),
topic,
bootstraps,
groupId);
}
tEnv.executeSql(createTable);
String initialValues = "INSERT INTO kafka\n" +
"SELECT CAST(price AS DECIMAL(10, 2)), currency, CAST(ts AS TIMESTAMP(3))\n" +
......@@ -140,7 +154,7 @@ public abstract class KafkaTableTestBase extends KafkaTestBase {
"FROM kafka\n" +
"GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)";
DataStream<Row> result = tEnv.toAppendStream(tEnv.sqlQuery(query), Row.class);
DataStream<RowData> result = tEnv.toAppendStream(tEnv.sqlQuery(query), RowData.class);
TestingSinkFunction sink = new TestingSinkFunction(2);
result.addSink(sink).setParallelism(1);
......@@ -156,8 +170,8 @@ public abstract class KafkaTableTestBase extends KafkaTestBase {
}
List<String> expected = Arrays.asList(
"2019-12-12 00:00:05.000,2019-12-12 00:00:04.004,3,50.00",
"2019-12-12 00:00:10.000,2019-12-12 00:00:06.006,2,5.33");
"+I(2019-12-12 00:00:05.000,2019-12-12 00:00:04.004,3,50.00)",
"+I(2019-12-12 00:00:10.000,2019-12-12 00:00:06.006,2,5.33)");
assertEquals(expected, TestingSinkFunction.rows);
......@@ -166,7 +180,7 @@ public abstract class KafkaTableTestBase extends KafkaTestBase {
deleteTestTopic(topic);
}
private static final class TestingSinkFunction implements SinkFunction<Row> {
private static final class TestingSinkFunction implements SinkFunction<RowData> {
private static final long serialVersionUID = 455430015321124493L;
private static List<String> rows = new ArrayList<>();
......@@ -179,7 +193,7 @@ public abstract class KafkaTableTestBase extends KafkaTestBase {
}
@Override
public void invoke(Row value, Context context) throws Exception {
public void invoke(RowData value, Context context) throws Exception {
rows.add(value.toString());
if (rows.size() >= expectedSize) {
// job finish
......
......@@ -171,6 +171,15 @@ under the License.
<scope>test</scope>
</dependency>
<!-- Kafka table factory testing -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.connector.format.SinkFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import java.util.Optional;
import java.util.Properties;
/**
* Kafka table sink for writing data into Kafka.
*/
@Internal
public class KafkaDynamicSink extends KafkaDynamicSinkBase {
public KafkaDynamicSink(
DataType consumedDataType,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
SinkFormat<SerializationSchema<RowData>> sinkFormat) {
super(
consumedDataType,
topic,
properties,
partitioner,
sinkFormat);
}
@Override
protected SinkFunction<RowData> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<RowData> serializationSchema,
Optional<FlinkKafkaPartitioner<RowData>> partitioner) {
return new FlinkKafkaProducer<>(
topic,
new KeyedSerializationSchemaWrapper<>(serializationSchema),
properties,
partitioner);
}
@Override
public DynamicTableSink copy() {
return new KafkaDynamicSink(
this.consumedDataType,
this.topic,
this.properties,
this.partitioner,
this.sinkFormat);
}
@Override
public String asSummaryString() {
return "Kafka universal table sink";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.connector.format.ScanFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import java.util.Map;
import java.util.Properties;
/**
* Kafka {@link org.apache.flink.table.connector.source.DynamicTableSource}.
*/
@Internal
public class KafkaDynamicSource extends KafkaDynamicSourceBase {
/**
* Creates a generic Kafka {@link StreamTableSource}.
*
* @param outputDataType Source output data type
* @param topic Kafka topic to consume
* @param properties Properties for the Kafka consumer
* @param scanFormat Scan format for decoding records from Kafka
* @param startupMode Startup mode for the contained consumer
* @param specificStartupOffsets Specific startup offsets; only relevant when startup
* mode is {@link StartupMode#SPECIFIC_OFFSETS}
*/
public KafkaDynamicSource(
DataType outputDataType,
String topic,
Properties properties,
ScanFormat<DeserializationSchema<RowData>> scanFormat,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis) {
super(
outputDataType,
topic,
properties,
scanFormat,
startupMode,
specificStartupOffsets,
startupTimestampMillis);
}
@Override
protected FlinkKafkaConsumerBase<RowData> createKafkaConsumer(
String topic,
Properties properties,
DeserializationSchema<RowData> deserializationSchema) {
return new FlinkKafkaConsumer<>(topic, deserializationSchema, properties);
}
@Override
public DynamicTableSource copy() {
return new KafkaDynamicSource(
this.outputDataType,
this.topic,
this.properties,
this.scanFormat,
this.startupMode,
this.specificStartupOffsets,
this.startupTimestampMillis);
}
@Override
public String asSummaryString() {
return "Kafka";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.connector.format.ScanFormat;
import org.apache.flink.table.connector.format.SinkFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
/**
* Factory for creating configured instances of {@link KafkaDynamicSource}.
*/
public class KafkaDynamicTableFactory extends KafkaDynamicTableFactoryBase {
public static final String IDENTIFIER = "kafka";
@Override
protected KafkaDynamicSourceBase createKafkaTableSource(
DataType producedDataType,
String topic,
Properties properties,
ScanFormat<DeserializationSchema<RowData>> scanFormat,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis) {
return new KafkaDynamicSource(
producedDataType,
topic,
properties,
scanFormat,
startupMode,
specificStartupOffsets,
startupTimestampMillis);
}
@Override
protected KafkaDynamicSinkBase createKafkaTableSink(
DataType consumedDataType,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
SinkFormat<SerializationSchema<RowData>> sinkFormat) {
return new KafkaDynamicSink(
consumedDataType,
topic,
properties,
partitioner,
sinkFormat);
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicTableFactory
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaTableSink;
import org.apache.flink.streaming.connectors.kafka.KafkaTableSource;
import org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.connector.format.ScanFormat;
import org.apache.flink.table.connector.format.SinkFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
/**
* Test for {@link KafkaTableSource} and {@link KafkaTableSink} created
* by {@link KafkaTableSourceSinkFactory}.
*/
public class KafkaDynamicTableFactoryTest extends KafkaDynamicTableFactoryTestBase {
@Override
protected String factoryIdentifier() {
return KafkaDynamicTableFactory.IDENTIFIER;
}
@Override
protected Class<?> getExpectedConsumerClass() {
return FlinkKafkaConsumer.class;
}
@Override
protected Class<?> getExpectedProducerClass() {
return FlinkKafkaProducer.class;
}
@Override
protected KafkaDynamicSourceBase getExpectedScanSource(
DataType producedDataType,
String topic,
Properties properties,
ScanFormat<DeserializationSchema<RowData>> scanFormat,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestamp) {
return new KafkaDynamicSource(
producedDataType,
topic,
properties,
scanFormat,
startupMode,
specificStartupOffsets,
startupTimestamp);
}
@Override
protected KafkaDynamicSinkBase getExpectedSink(
DataType consumedDataType,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<RowData>> partitioner,
SinkFormat<SerializationSchema<RowData>> sinkFormat) {
return new KafkaDynamicSink(
consumedDataType,
topic,
properties,
partitioner,
sinkFormat);
}
}
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
package org.apache.flink.streaming.connectors.kafka.table;
import org.apache.flink.table.descriptors.KafkaValidator;
......@@ -25,6 +25,11 @@ import org.apache.flink.table.descriptors.KafkaValidator;
*/
public class KafkaTableITCase extends KafkaTableTestBase {
@Override
public String factoryIdentifier() {
return KafkaDynamicTableFactory.IDENTIFIER;
}
@Override
public String kafkaVersion() {
return KafkaValidator.CONNECTOR_VERSION_VALUE_UNIVERSAL;
......
......@@ -99,7 +99,7 @@ public class TestFormatFactory implements DeserializationFormatFactory, Serializ
public final String delimiter;
public final Boolean failOnMissing;
ScanFormatMock(String delimiter, Boolean failOnMissing) {
public ScanFormatMock(String delimiter, Boolean failOnMissing) {
this.delimiter = delimiter;
this.failOnMissing = failOnMissing;
}
......@@ -145,7 +145,7 @@ public class TestFormatFactory implements DeserializationFormatFactory, Serializ
public final String delimiter;
SinkFormatMock(String delimiter) {
public SinkFormatMock(String delimiter) {
this.delimiter = delimiter;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册