提交 7d2c269b 编写于 作者: Y yanghua 提交者: Timo Walther

[FLINK-11015] Remove deprecated code for format-specific Kafka table connectors

This commit removes all classes and methods that have been deprecated in
Flink 1.6 for separating Kafka connectors from Avro and JSON formats.

This closes #7182.
上级 761e8099
......@@ -86,40 +86,8 @@ under the License.
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-avro. -->
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-json. -->
<optional>true</optional>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public class Kafka010AvroTableSource extends KafkaAvroTableSource {
/**
* Creates a Kafka 0.10 Avro {@link StreamTableSource} using a given {@link SpecificRecord}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param schema Schema of the produced table.
* @param record Avro specific record.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public Kafka010AvroTableSource(
String topic,
Properties properties,
TableSchema schema,
Class<? extends SpecificRecordBase> record) {
super(
topic,
properties,
schema,
record);
}
/**
* Sets a mapping from schema fields to fields of the produced Avro record.
*
* <p>A field mapping is required if the fields of produced tables should be named different than
* the fields of the Avro record.
* The key of the provided Map refers to the field of the table schema,
* the value to the field of the Avro record.</p>
*
* @param fieldMapping A mapping from schema fields to Avro fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
}
/**
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
}
/**
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
}
@Override
protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
}
/**
* Returns a builder to configure and create a {@link Kafka010AvroTableSource}.
*
* @return A builder to configure and create a {@link Kafka010AvroTableSource}.
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public static Builder builder() {
return new Builder();
}
/**
* A builder to configure and create a {@link Kafka010AvroTableSource}.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public static class Builder extends KafkaAvroTableSource.Builder<Kafka010AvroTableSource, Kafka010AvroTableSource.Builder> {
@Override
protected boolean supportsKafkaTimestamps() {
return true;
}
@Override
protected Kafka010AvroTableSource.Builder builder() {
return this;
}
/**
* Builds and configures a {@link Kafka010AvroTableSource}.
*
* @return A configured {@link Kafka010AvroTableSource}.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public Kafka010AvroTableSource build() {
Kafka010AvroTableSource tableSource = new Kafka010AvroTableSource(
getTopic(),
getKafkaProps(),
getTableSchema(),
getAvroRecordClass());
super.configureTableSource(tableSource);
return tableSource;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.types.Row;
import java.util.Optional;
import java.util.Properties;
/**
* Kafka 0.10 {@link KafkaTableSinkBase} that serializes data in JSON format.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public class Kafka010JsonTableSink extends KafkaJsonTableSink {
/**
* Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.10
* topic with fixed partition assignment.
*
* <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p>
* <ul>
* <li>If the number of Kafka partitions is less than the number of sink instances, different
* sink instances will write to the same partition.</li>
* <li>If the number of Kafka partitions is higher than the number of sink instance, some
* Kafka partitions won't receive data.</li>
* </ul>
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @deprecated Use table descriptors instead of implementation-specific classes.
*/
@Deprecated
public Kafka010JsonTableSink(String topic, Properties properties) {
super(topic, properties, new FlinkFixedPartitioner<>());
}
/**
* Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.10
* topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
* @deprecated Use table descriptors instead of implementation-specific classes.
*/
@Deprecated
public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
super(topic, properties, partitioner);
}
@Override
protected FlinkKafkaProducerBase<Row> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer010<>(
topic,
serializationSchema,
properties,
partitioner.orElse(new FlinkFixedPartitioner<>()));
}
@Override
protected Kafka010JsonTableSink createCopy() {
return new Kafka010JsonTableSink(
topic,
properties,
partitioner.orElse(new FlinkFixedPartitioner<>()));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.10.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public class Kafka010JsonTableSource extends KafkaJsonTableSource {
/**
* Creates a Kafka 0.10 JSON {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param tableSchema The schema of the table.
* @param jsonSchema The schema of the JSON messages to decode from Kafka.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public Kafka010JsonTableSource(
String topic,
Properties properties,
TableSchema tableSchema,
TableSchema jsonSchema) {
super(topic, properties, tableSchema, jsonSchema);
}
/**
* Sets the flag that specifies the behavior in case of missing fields.
* TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
*
* @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFailOnMissingField(boolean failOnMissingField) {
super.setFailOnMissingField(failOnMissingField);
}
/**
* Sets the mapping from table schema fields to JSON schema fields.
*
* @param fieldMapping The mapping from table schema fields to JSON schema fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
}
/**
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
}
/**
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
}
@Override
protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
}
/**
* Returns a builder to configure and create a {@link Kafka010JsonTableSource}.
*
* @return A builder to configure and create a {@link Kafka010JsonTableSource}.
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public static Kafka010JsonTableSource.Builder builder() {
return new Kafka010JsonTableSource.Builder();
}
/**
* A builder to configure and create a {@link Kafka010JsonTableSource}.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public static class Builder extends KafkaJsonTableSource.Builder<Kafka010JsonTableSource, Kafka010JsonTableSource.Builder> {
@Override
protected boolean supportsKafkaTimestamps() {
return true;
}
@Override
protected Kafka010JsonTableSource.Builder builder() {
return this;
}
/**
* Builds and configures a {@link Kafka010JsonTableSource}.
*
* @return A configured {@link Kafka010JsonTableSource}.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Override
public Kafka010JsonTableSource build() {
Kafka010JsonTableSource tableSource = new Kafka010JsonTableSource(
getTopic(),
getKafkaProps(),
getTableSchema(),
getJsonSchema());
super.configureTableSource(tableSource);
return tableSource;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
import org.apache.flink.types.Row;
/**
* Tests for the {@link Kafka010AvroTableSource}.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public class Kafka010AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
@Override
protected KafkaTableSourceBase.Builder getBuilder() {
return Kafka010AvroTableSource.builder();
}
@Override
@SuppressWarnings("unchecked")
protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
return (Class) AvroRowDeserializationSchema.class;
}
@Override
@SuppressWarnings("unchecked")
protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
return (Class) FlinkKafkaConsumer010.class;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import java.util.Properties;
/**
* Tests for the {@link Kafka010JsonTableSink}.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sinks.
*/
@Deprecated
public class Kafka010JsonTableSinkTest extends KafkaTableSinkBaseTestBase {
@Override
protected KafkaTableSinkBase createTableSink(
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner) {
return new Kafka010JsonTableSink(
topic,
properties,
partitioner);
}
@Override
protected Class<? extends SerializationSchema<Row>> getSerializationSchemaClass() {
return JsonRowSerializationSchema.class;
}
@Override
protected Class<? extends FlinkKafkaProducerBase> getProducerClass() {
return FlinkKafkaProducer010.class;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_010;
/**
* Tests for legacy Kafka010JsonTableSourceFactory.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public class Kafka010JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase {
@Override
protected String version() {
return CONNECTOR_VERSION_VALUE_010;
}
@Override
protected KafkaJsonTableSource.Builder builder() {
return Kafka010JsonTableSource.builder();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;
/**
* Tests for the {@link Kafka010JsonTableSource}.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public class Kafka010JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
@Override
protected KafkaTableSourceBase.Builder getBuilder() {
return Kafka010JsonTableSource.builder();
}
@Override
@SuppressWarnings("unchecked")
protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
return (Class) JsonRowDeserializationSchema.class;
}
@Override
@SuppressWarnings("unchecked")
protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
return (Class) FlinkKafkaConsumer010.class;
}
}
......@@ -86,22 +86,6 @@ under the License.
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-avro. -->
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-json. -->
<optional>true</optional>
</dependency>
<!-- test dependencies -->
<dependency>
......@@ -112,23 +96,6 @@ under the License.
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.11.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public class Kafka011AvroTableSource extends KafkaAvroTableSource {
/**
* Creates a Kafka 0.11 Avro {@link StreamTableSource} using a given {@link SpecificRecord}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param schema Schema of the produced table.
* @param record Avro specific record.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public Kafka011AvroTableSource(
String topic,
Properties properties,
TableSchema schema,
Class<? extends SpecificRecordBase> record) {
super(
topic,
properties,
schema,
record);
}
/**
* Sets a mapping from schema fields to fields of the produced Avro record.
*
* <p>A field mapping is required if the fields of produced tables should be named different than
* the fields of the Avro record.
* The key of the provided Map refers to the field of the table schema,
* the value to the field of the Avro record.</p>
*
* @param fieldMapping A mapping from schema fields to Avro fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
}
/**
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
}
/**
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
}
@Override
protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
}
/**
* Returns a builder to configure and create a {@link Kafka011AvroTableSource}.
* @return A builder to configure and create a {@link Kafka011AvroTableSource}.
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public static Builder builder() {
return new Builder();
}
/**
* A builder to configure and create a {@link Kafka011AvroTableSource}.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public static class Builder extends KafkaAvroTableSource.Builder<Kafka011AvroTableSource, Kafka011AvroTableSource.Builder> {
@Override
protected boolean supportsKafkaTimestamps() {
return true;
}
@Override
protected Kafka011AvroTableSource.Builder builder() {
return this;
}
/**
* Builds and configures a {@link Kafka011AvroTableSource}.
*
* @return A configured {@link Kafka011AvroTableSource}.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public Kafka011AvroTableSource build() {
Kafka011AvroTableSource tableSource = new Kafka011AvroTableSource(
getTopic(),
getKafkaProps(),
getTableSchema(),
getAvroRecordClass());
super.configureTableSource(tableSource);
return tableSource;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.11.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public class Kafka011JsonTableSource extends KafkaJsonTableSource {
/**
* Creates a Kafka 0.11 JSON {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param tableSchema The schema of the table.
* @param jsonSchema The schema of the JSON messages to decode from Kafka.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public Kafka011JsonTableSource(
String topic,
Properties properties,
TableSchema tableSchema,
TableSchema jsonSchema) {
super(topic, properties, tableSchema, jsonSchema);
}
/**
* Sets the flag that specifies the behavior in case of missing fields.
* TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
*
* @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFailOnMissingField(boolean failOnMissingField) {
super.setFailOnMissingField(failOnMissingField);
}
/**
* Sets the mapping from table schema fields to JSON schema fields.
*
* @param fieldMapping The mapping from table schema fields to JSON schema fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
}
/**
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
}
/**
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
}
@Override
protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
}
/**
* Returns a builder to configure and create a {@link Kafka011JsonTableSource}.
*
* @return A builder to configure and create a {@link Kafka011JsonTableSource}.
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public static Kafka011JsonTableSource.Builder builder() {
return new Kafka011JsonTableSource.Builder();
}
/**
* A builder to configure and create a {@link Kafka011JsonTableSource}.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public static class Builder extends KafkaJsonTableSource.Builder<Kafka011JsonTableSource, Kafka011JsonTableSource.Builder> {
@Override
protected boolean supportsKafkaTimestamps() {
return true;
}
@Override
protected Kafka011JsonTableSource.Builder builder() {
return this;
}
/**
* Builds and configures a {@link Kafka011JsonTableSource}.
*
* @return A configured {@link Kafka011JsonTableSource}.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public Kafka011JsonTableSource build() {
Kafka011JsonTableSource tableSource = new Kafka011JsonTableSource(
getTopic(),
getKafkaProps(),
getTableSchema(),
getJsonSchema());
super.configureTableSource(tableSource);
return tableSource;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
import org.apache.flink.types.Row;
/**
* Tests for the {@link Kafka011AvroTableSource}.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public class Kafka011AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
@Override
protected KafkaTableSourceBase.Builder getBuilder() {
return Kafka011AvroTableSource.builder();
}
@Override
@SuppressWarnings("unchecked")
protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
return (Class) AvroRowDeserializationSchema.class;
}
@Override
@SuppressWarnings("unchecked")
protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
return (Class) FlinkKafkaConsumer011.class;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_011;
/**
* Tests for legacy Kafka011JsonTableSourceFactory.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public class Kafka011JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase {
@Override
protected String version() {
return CONNECTOR_VERSION_VALUE_011;
}
@Override
protected KafkaJsonTableSource.Builder builder() {
return Kafka011JsonTableSource.builder();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;
/**
* Tests for the {@link Kafka011JsonTableSource}.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public class Kafka011JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
@Override
protected KafkaTableSourceBase.Builder getBuilder() {
return Kafka011JsonTableSource.builder();
}
@Override
@SuppressWarnings("unchecked")
protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
return (Class) JsonRowDeserializationSchema.class;
}
@Override
@SuppressWarnings("unchecked")
protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
return (Class) FlinkKafkaConsumer011.class;
}
}
......@@ -81,22 +81,6 @@ under the License.
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-avro. -->
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-json. -->
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.binary.version}</artifactId>
......@@ -151,22 +135,6 @@ under the License.
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.8.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public class Kafka08AvroTableSource extends KafkaAvroTableSource {
/**
* Creates a Kafka 0.8 Avro {@link StreamTableSource} using a given {@link SpecificRecord}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param schema Schema of the produced table.
* @param record Avro specific record.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public Kafka08AvroTableSource(
String topic,
Properties properties,
TableSchema schema,
Class<? extends SpecificRecordBase> record) {
super(
topic,
properties,
schema,
record);
}
/**
* Sets a mapping from schema fields to fields of the produced Avro record.
*
* <p>A field mapping is required if the fields of produced tables should be named different than
* the fields of the Avro record.
* The key of the provided Map refers to the field of the table schema,
* the value to the field of the Avro record.</p>
*
* @param fieldMapping A mapping from schema fields to Avro fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
}
/**
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
}
/**
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
}
@Override
protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
}
/**
* Returns a builder to configure and create a {@link Kafka08AvroTableSource}.
*
* @return A builder to configure and create a {@link Kafka08AvroTableSource}.
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public static Builder builder() {
return new Builder();
}
/**
* A builder to configure and create a {@link Kafka08AvroTableSource}.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public static class Builder extends KafkaAvroTableSource.Builder<Kafka08AvroTableSource, Kafka08AvroTableSource.Builder> {
@Override
protected boolean supportsKafkaTimestamps() {
return false;
}
@Override
protected Kafka08AvroTableSource.Builder builder() {
return this;
}
/**
* Builds and configures a {@link Kafka08AvroTableSource}.
*
* @return A configured {@link Kafka08AvroTableSource}.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public Kafka08AvroTableSource build() {
Kafka08AvroTableSource tableSource = new Kafka08AvroTableSource(
getTopic(),
getKafkaProps(),
getTableSchema(),
getAvroRecordClass());
super.configureTableSource(tableSource);
return tableSource;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.types.Row;
import java.util.Optional;
import java.util.Properties;
/**
* Kafka 0.8 {@link KafkaTableSinkBase} that serializes data in JSON format.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public class Kafka08JsonTableSink extends KafkaJsonTableSink {
/**
* Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.8
* topic with fixed partition assignment.
*
* <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p>
* <ul>
* <li>If the number of Kafka partitions is less than the number of sink instances, different
* sink instances will write to the same partition.</li>
* <li>If the number of Kafka partitions is higher than the number of sink instance, some
* Kafka partitions won't receive data.</li>
* </ul>
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @deprecated Use table descriptors instead of implementation-specific classes.
*/
@Deprecated
public Kafka08JsonTableSink(String topic, Properties properties) {
super(topic, properties, new FlinkFixedPartitioner<>());
}
/**
* Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.8
* topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
* @deprecated Use table descriptors instead of implementation-specific classes.
*/
@Deprecated
public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
super(topic, properties, partitioner);
}
/**
* Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.8
* topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
*
* @deprecated This is a deprecated constructor that does not correctly handle partitioning when
* producing to multiple topics. Use
* {@link #Kafka08JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead.
*/
@Deprecated
public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
}
@Override
protected FlinkKafkaProducerBase<Row> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer08<>(
topic,
serializationSchema,
properties,
partitioner.orElse(new FlinkFixedPartitioner<>()));
}
@Override
protected Kafka08JsonTableSink createCopy() {
return new Kafka08JsonTableSink(
topic,
properties,
partitioner.orElse(new FlinkFixedPartitioner<>()));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.8.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public class Kafka08JsonTableSource extends KafkaJsonTableSource {
/**
* Creates a Kafka 0.8 JSON {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param tableSchema The schema of the table.
* @param jsonSchema The schema of the JSON messages to decode from Kafka.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public Kafka08JsonTableSource(
String topic,
Properties properties,
TableSchema tableSchema,
TableSchema jsonSchema) {
super(topic, properties, tableSchema, jsonSchema);
}
/**
* Sets the flag that specifies the behavior in case of missing fields.
* TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
*
* @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFailOnMissingField(boolean failOnMissingField) {
super.setFailOnMissingField(failOnMissingField);
}
/**
* Sets the mapping from table schema fields to JSON schema fields.
*
* @param fieldMapping The mapping from table schema fields to JSON schema fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
}
/**
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
}
/**
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
}
@Override
protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
}
/**
* Returns a builder to configure and create a {@link Kafka08JsonTableSource}.
* @return A builder to configure and create a {@link Kafka08JsonTableSource}.
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public static Kafka08JsonTableSource.Builder builder() {
return new Kafka08JsonTableSource.Builder();
}
/**
* A builder to configure and create a {@link Kafka08JsonTableSource}.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public static class Builder extends KafkaJsonTableSource.Builder<Kafka08JsonTableSource, Kafka08JsonTableSource.Builder> {
@Override
protected boolean supportsKafkaTimestamps() {
return false;
}
@Override
protected Kafka08JsonTableSource.Builder builder() {
return this;
}
/**
* Builds and configures a {@link Kafka08JsonTableSource}.
*
* @return A configured {@link Kafka08JsonTableSource}.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public Kafka08JsonTableSource build() {
Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
getTopic(),
getKafkaProps(),
getTableSchema(),
getJsonSchema());
super.configureTableSource(tableSource);
return tableSource;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
import org.apache.flink.types.Row;
/**
* Tests for the {@link Kafka08AvroTableSource}.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public class Kafka08AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
@Override
protected KafkaTableSourceBase.Builder getBuilder() {
return Kafka08AvroTableSource.builder();
}
@Override
@SuppressWarnings("unchecked")
protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
return (Class) AvroRowDeserializationSchema.class;
}
@Override
@SuppressWarnings("unchecked")
protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
return (Class) FlinkKafkaConsumer08.class;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import java.util.Properties;
/**
* Tests for the {@link Kafka08JsonTableSink}.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sinks.
*/
@Deprecated
public class Kafka08JsonTableSinkTest extends KafkaTableSinkBaseTestBase {
@Override
protected KafkaTableSinkBase createTableSink(
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner) {
return new Kafka08JsonTableSink(
topic,
properties,
partitioner);
}
@Override
protected Class<? extends SerializationSchema<Row>> getSerializationSchemaClass() {
return JsonRowSerializationSchema.class;
}
@Override
protected Class<? extends FlinkKafkaProducerBase> getProducerClass() {
return FlinkKafkaProducer08.class;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_08;
/**
* Tests for legacy Kafka08JsonTableSourceFactory.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public class Kafka08JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase {
@Override
protected String version() {
return CONNECTOR_VERSION_VALUE_08;
}
@Override
protected KafkaJsonTableSource.Builder builder() {
return Kafka08JsonTableSource.builder();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;
/**
* Tests for the {@link Kafka08JsonTableSource}.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public class Kafka08JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
@Override
protected KafkaTableSourceBase.Builder getBuilder() {
return Kafka08JsonTableSource.builder();
}
@Override
@SuppressWarnings("unchecked")
protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
return (Class) JsonRowDeserializationSchema.class;
}
@Override
@SuppressWarnings("unchecked")
protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
return (Class) FlinkKafkaConsumer08.class;
}
}
......@@ -76,22 +76,6 @@ under the License.
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-avro. -->
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-json. -->
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
......@@ -100,22 +84,6 @@ under the License.
<!-- test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.9.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public class Kafka09AvroTableSource extends KafkaAvroTableSource {
/**
* Creates a Kafka 0.9 Avro {@link StreamTableSource} using a given {@link SpecificRecord}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param schema Schema of the produced table.
* @param record Avro specific record.
*/
public Kafka09AvroTableSource(
String topic,
Properties properties,
TableSchema schema,
Class<? extends SpecificRecordBase> record) {
super(
topic,
properties,
schema,
record);
}
/**
* Sets a mapping from schema fields to fields of the produced Avro record.
*
* <p>A field mapping is required if the fields of produced tables should be named different than
* the fields of the Avro record.
* The key of the provided Map refers to the field of the table schema,
* the value to the field of the Avro record.</p>
*
* @param fieldMapping A mapping from schema fields to Avro fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
}
/**
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
}
/**
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
}
@Override
protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
}
/**
* Returns a builder to configure and create a {@link Kafka09AvroTableSource}.
*
* @return A builder to configure and create a {@link Kafka09AvroTableSource}.
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public static Builder builder() {
return new Builder();
}
/**
* A builder to configure and create a {@link Kafka09AvroTableSource}.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public static class Builder extends KafkaAvroTableSource.Builder<Kafka09AvroTableSource, Kafka09AvroTableSource.Builder> {
@Override
protected boolean supportsKafkaTimestamps() {
return false;
}
@Override
protected Kafka09AvroTableSource.Builder builder() {
return this;
}
/**
* Builds and configures a {@link Kafka09AvroTableSource}.
*
* @return A configured {@link Kafka09AvroTableSource}.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public Kafka09AvroTableSource build() {
Kafka09AvroTableSource tableSource = new Kafka09AvroTableSource(
getTopic(),
getKafkaProps(),
getTableSchema(),
getAvroRecordClass());
super.configureTableSource(tableSource);
return tableSource;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.types.Row;
import java.util.Optional;
import java.util.Properties;
/**
* Kafka 0.9 {@link KafkaTableSinkBase} that serializes data in JSON format.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public class Kafka09JsonTableSink extends KafkaJsonTableSink {
/**
* Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.9
* topic with fixed partition assignment.
*
* <p>Each parallel TableSink instance will write its rows to a single Kafka partition.</p>
* <ul>
* <li>If the number of Kafka partitions is less than the number of sink instances, different
* sink instances will write to the same partition.</li>
* <li>If the number of Kafka partitions is higher than the number of sink instance, some
* Kafka partitions won't receive data.</li>
* </ul>
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @deprecated Use table descriptors instead of implementation-specific classes.
*/
@Deprecated
public Kafka09JsonTableSink(String topic, Properties properties) {
super(topic, properties, new FlinkFixedPartitioner<>());
}
/**
* Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.9
* topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
* @deprecated Use table descriptors instead of implementation-specific classes.
*/
@Deprecated
public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
super(topic, properties, partitioner);
}
/**
* Creates {@link KafkaTableSinkBase} to write table rows as JSON-encoded records to a Kafka 0.9
* topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
*
* @deprecated This is a deprecated constructor that does not correctly handle partitioning when
* producing to multiple topics. Use
* {@link #Kafka09JsonTableSink(String, Properties, FlinkKafkaPartitioner)} instead.
*/
@Deprecated
public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner) {
super(topic, properties, new FlinkKafkaDelegatePartitioner<>(partitioner));
}
@Override
protected FlinkKafkaProducerBase<Row> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer09<>(
topic,
serializationSchema,
properties,
partitioner.orElse(new FlinkFixedPartitioner<>()));
}
@Override
protected Kafka09JsonTableSink createCopy() {
return new Kafka09JsonTableSink(
topic,
properties,
partitioner.orElse(new FlinkFixedPartitioner<>()));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
/**
* Kafka {@link StreamTableSource} for Kafka 0.9.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public class Kafka09JsonTableSource extends KafkaJsonTableSource {
/**
* Creates a Kafka 0.9 JSON {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param tableSchema The schema of the table.
* @param jsonSchema The schema of the JSON messages to decode from Kafka.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public Kafka09JsonTableSource(
String topic,
Properties properties,
TableSchema tableSchema,
TableSchema jsonSchema) {
super(topic, properties, tableSchema, jsonSchema);
}
/**
* Sets the flag that specifies the behavior in case of missing fields.
* TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
*
* @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFailOnMissingField(boolean failOnMissingField) {
super.setFailOnMissingField(failOnMissingField);
}
/**
* Sets the mapping from table schema fields to JSON schema fields.
*
* @param fieldMapping The mapping from table schema fields to JSON schema fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setFieldMapping(Map<String, String> fieldMapping) {
super.setFieldMapping(fieldMapping);
}
/**
* Declares a field of the schema to be a processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public void setProctimeAttribute(String proctimeAttribute) {
super.setProctimeAttribute(proctimeAttribute);
}
/**
* Declares a field of the schema to be a rowtime attribute.
*
* @param rowtimeAttributeDescriptor The descriptor of the rowtime attribute.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public void setRowtimeAttributeDescriptor(RowtimeAttributeDescriptor rowtimeAttributeDescriptor) {
Preconditions.checkNotNull(rowtimeAttributeDescriptor, "Rowtime attribute descriptor must not be null.");
super.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
}
@Override
protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
}
/**
* Returns a builder to configure and create a {@link Kafka09JsonTableSource}.
* @return A builder to configure and create a {@link Kafka09JsonTableSource}.
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public static Kafka09JsonTableSource.Builder builder() {
return new Kafka09JsonTableSource.Builder();
}
/**
* A builder to configure and create a {@link Kafka09JsonTableSource}.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
public static class Builder extends KafkaJsonTableSource.Builder<Kafka09JsonTableSource, Kafka09JsonTableSource.Builder> {
@Override
protected boolean supportsKafkaTimestamps() {
return false;
}
@Override
protected Kafka09JsonTableSource.Builder builder() {
return this;
}
/**
* Builds and configures a {@link Kafka09JsonTableSource}.
*
* @return A configured {@link Kafka09JsonTableSource}.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
@Override
public Kafka09JsonTableSource build() {
Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
getTopic(),
getKafkaProps(),
getTableSchema(),
getJsonSchema());
super.configureTableSource(tableSource);
return tableSource;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
import org.apache.flink.types.Row;
/**
* Tests for the {@link Kafka09AvroTableSource}.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public class Kafka09AvroTableSourceTest extends KafkaAvroTableSourceTestBase {
@Override
protected KafkaTableSourceBase.Builder getBuilder() {
return Kafka09AvroTableSource.builder();
}
@Override
@SuppressWarnings("unchecked")
protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
return (Class) AvroRowDeserializationSchema.class;
}
@Override
@SuppressWarnings("unchecked")
protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
return (Class) FlinkKafkaConsumer09.class;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import java.util.Properties;
/**
* Tests for the {@link Kafka09JsonTableSink}.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sinks.
*/
@Deprecated
public class Kafka09JsonTableSinkTest extends KafkaTableSinkBaseTestBase {
@Override
protected KafkaTableSinkBase createTableSink(
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner) {
return new Kafka09JsonTableSink(
topic,
properties,
partitioner);
}
@Override
protected Class<? extends SerializationSchema<Row>> getSerializationSchemaClass() {
return JsonRowSerializationSchema.class;
}
@Override
protected Class<? extends FlinkKafkaProducerBase> getProducerClass() {
return FlinkKafkaProducer09.class;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_VERSION_VALUE_09;
/**
* Factory for creating configured instances of {@link Kafka09JsonTableSource}.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public class Kafka09JsonTableSourceFactoryTest extends KafkaJsonTableSourceFactoryTestBase {
@Override
protected String version() {
return CONNECTOR_VERSION_VALUE_09;
}
@Override
protected KafkaJsonTableSource.Builder builder() {
return Kafka09JsonTableSource.builder();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.types.Row;
/**
* Tests for the {@link Kafka09JsonTableSource}.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public class Kafka09JsonTableSourceTest extends KafkaJsonTableSourceTestBase {
@Override
protected KafkaTableSourceBase.Builder getBuilder() {
return Kafka09JsonTableSource.builder();
}
@Override
@SuppressWarnings("unchecked")
protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
return (Class) JsonRowDeserializationSchema.class;
}
@Override
@SuppressWarnings("unchecked")
protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
return (Class) FlinkKafkaConsumer09.class;
}
}
......@@ -66,22 +66,6 @@ under the License.
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-avro. -->
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-json. -->
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
......@@ -189,22 +173,6 @@ under the License.
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import java.util.Map;
import java.util.Properties;
/**
* A version-agnostic Kafka Avro {@link StreamTableSource}.
*
* <p>The version-specific Kafka consumers need to extend this class and
* override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
@Internal
public abstract class KafkaAvroTableSource extends KafkaTableSourceBase {
/**
* Creates a generic Kafka Avro {@link StreamTableSource} using a given {@link SpecificRecord}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param schema Schema of the produced table.
* @param avroRecordClass Class of the Avro record that is read from the Kafka topic.
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
protected KafkaAvroTableSource(
String topic,
Properties properties,
TableSchema schema,
Class<? extends SpecificRecordBase> avroRecordClass) {
super(
schema,
topic,
properties,
new AvroRowDeserializationSchema(avroRecordClass));
}
@Override
public String explainSource() {
return "KafkaAvroTableSource";
}
//////// HELPER METHODS
/**
* Abstract builder for a {@link KafkaAvroTableSource} to be extended by builders of subclasses of
* KafkaAvroTableSource.
*
* @param <T> Type of the KafkaAvroTableSource produced by the builder.
* @param <B> Type of the KafkaAvroTableSource.Builder subclass.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
protected abstract static class Builder<T extends KafkaAvroTableSource, B extends KafkaAvroTableSource.Builder>
extends KafkaTableSourceBase.Builder<T, B> {
private Class<? extends SpecificRecordBase> avroClass;
private Map<String, String> fieldMapping;
/**
* Sets the class of the Avro records that are read from the Kafka topic.
*
* @param avroClass The class of the Avro records that are read from the Kafka topic.
* @return The builder.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public B forAvroRecordClass(Class<? extends SpecificRecordBase> avroClass) {
this.avroClass = avroClass;
return builder();
}
/**
* Sets a mapping from schema fields to fields of the produced Avro record.
*
* <p>A field mapping is required if the fields of produced tables should be named different than
* the fields of the Avro record.
* The key of the provided Map refers to the field of the table schema,
* the value to the field of the Avro record.</p>
*
* @param schemaToAvroMapping A mapping from schema fields to Avro fields.
* @return The builder.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public B withTableToAvroMapping(Map<String, String> schemaToAvroMapping) {
this.fieldMapping = schemaToAvroMapping;
return builder();
}
/**
* Returns the configured Avro class.
*
* @return The configured Avro class.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
protected Class<? extends SpecificRecordBase> getAvroRecordClass() {
return this.avroClass;
}
@Override
protected void configureTableSource(T source) {
super.configureTableSource(source);
source.setFieldMapping(this.fieldMapping);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.formats.json.JsonRowSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.types.Row;
import java.util.Properties;
/**
* Base class for {@link KafkaTableSinkBase} that serializes data in JSON format.
*
* @deprecated Use table descriptors instead of implementation-specific classes.
*/
@Deprecated
@Internal
public abstract class KafkaJsonTableSink extends KafkaTableSinkBase {
/**
* Creates KafkaJsonTableSink.
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
* @param partitioner Kafka partitioner
* @deprecated Use table descriptors instead of implementation-specific classes.
*/
@Deprecated
public KafkaJsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner<Row> partitioner) {
super(topic, properties, partitioner);
}
@Override
protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema) {
return new JsonRowSerializationSchema(rowSchema);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.formats.json.JsonRowDeserializationSchema;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import java.util.Map;
import java.util.Properties;
/**
* A version-agnostic Kafka JSON {@link StreamTableSource}.
*
* <p>The version-specific Kafka consumers need to extend this class and
* override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
*
* <p>The field names are used to parse the JSON file and so are the types.
*
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
@Internal
public abstract class KafkaJsonTableSource extends KafkaTableSourceBase {
/**
* Creates a generic Kafka JSON {@link StreamTableSource}.
*
* @param topic Kafka topic to consume.
* @param properties Properties for the Kafka consumer.
* @param tableSchema The schema of the table.
* @param jsonSchema The schema of the JSON messages to decode from Kafka.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
protected KafkaJsonTableSource(
String topic,
Properties properties,
TableSchema tableSchema,
TableSchema jsonSchema) {
super(
tableSchema,
topic,
properties,
new JsonRowDeserializationSchema(jsonSchema.toRowType()));
}
@Override
public String explainSource() {
return "KafkaJsonTableSource";
}
//////// SETTERS FOR OPTIONAL PARAMETERS
/**
* Sets the flag that specifies the behavior in case of missing fields.
* TableSource will fail for missing fields if set to true. If set to false, the missing field is set to null.
*
* @param failOnMissingField Flag that specifies the TableSource behavior in case of missing fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
protected void setFailOnMissingField(boolean failOnMissingField) {
((JsonRowDeserializationSchema) getDeserializationSchema()).setFailOnMissingField(failOnMissingField);
}
//////// HELPER METHODS
/**
* Abstract builder for a {@link KafkaJsonTableSource} to be extended by builders of subclasses of
* KafkaJsonTableSource.
*
* @param <T> Type of the KafkaJsonTableSource produced by the builder.
* @param <B> Type of the KafkaJsonTableSource.Builder subclass.
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
protected abstract static class Builder<T extends KafkaJsonTableSource, B extends KafkaJsonTableSource.Builder>
extends KafkaTableSourceBase.Builder<T, B> {
private TableSchema jsonSchema;
private Map<String, String> fieldMapping;
private boolean failOnMissingField = false;
/**
* Sets the schema of the JSON-encoded Kafka messages.
* If not set, the JSON messages are decoded with the table schema.
*
* @param jsonSchema The schema of the JSON-encoded Kafka messages.
* @return The builder.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public B forJsonSchema(TableSchema jsonSchema) {
this.jsonSchema = jsonSchema;
return builder();
}
/**
* Sets a mapping from schema fields to fields of the JSON schema.
*
* <p>A field mapping is required if the fields of produced tables should be named different than
* the fields of the JSON records.
* The key of the provided Map refers to the field of the table schema,
* the value to the field in the JSON schema.</p>
*
* @param tableToJsonMapping A mapping from table schema fields to JSON schema fields.
* @return The builder.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public B withTableToJsonMapping(Map<String, String> tableToJsonMapping) {
this.fieldMapping = tableToJsonMapping;
return builder();
}
/**
* Sets flag whether to fail if a field is missing or not.
*
* @param failOnMissingField If set to true, the TableSource fails if there is a missing
* field.
* If set to false, a missing field is set to null.
* @return The builder.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public B failOnMissingField(boolean failOnMissingField) {
this.failOnMissingField = failOnMissingField;
return builder();
}
/**
* Returns the configured JSON schema. If no JSON schema was configured, the table schema
* is returned.
*
* @return The JSON schema for the TableSource.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
protected TableSchema getJsonSchema() {
if (jsonSchema != null) {
return this.jsonSchema;
} else {
return getTableSchema();
}
}
/**
* Configures a TableSource with optional parameters.
*
* @param source The TableSource to configure.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
protected void configureTableSource(T source) {
super.configureTableSource(source);
// configure field mapping
source.setFieldMapping(this.fieldMapping);
// configure missing field behavior
source.setFailOnMissingField(this.failOnMissingField);
}
}
}
......@@ -21,11 +21,11 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.util.TableConnectorUtil;
import org.apache.flink.types.Row;
......@@ -45,10 +45,8 @@ import java.util.Properties;
@Internal
public abstract class KafkaTableSinkBase implements AppendStreamTableSink<Row> {
// TODO make all attributes final and mandatory once we drop support for format-specific table sinks
/** The schema of the table. */
private final Optional<TableSchema> schema;
private final TableSchema schema;
/** The Kafka topic to write to. */
protected final String topic;
......@@ -57,47 +55,22 @@ public abstract class KafkaTableSinkBase implements AppendStreamTableSink<Row> {
protected final Properties properties;
/** Serialization schema for encoding records to Kafka. */
protected Optional<SerializationSchema<Row>> serializationSchema;
protected final SerializationSchema<Row> serializationSchema;
/** Partitioner to select Kafka partition for each item. */
protected final Optional<FlinkKafkaPartitioner<Row>> partitioner;
// legacy variables
protected String[] fieldNames;
protected TypeInformation[] fieldTypes;
protected KafkaTableSinkBase(
TableSchema schema,
String topic,
Properties properties,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
this.schema = Optional.of(Preconditions.checkNotNull(schema, "Schema must not be null."));
this.schema = Preconditions.checkNotNull(schema, "Schema must not be null.");
this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
this.properties = Preconditions.checkNotNull(properties, "Properties must not be null.");
this.partitioner = Preconditions.checkNotNull(partitioner, "Partitioner must not be null.");
this.serializationSchema = Optional.of(Preconditions.checkNotNull(
serializationSchema, "Serialization schema must not be null."));
}
/**
* Creates KafkaTableSinkBase.
*
* @param topic Kafka topic to write to.
* @param properties Properties for the Kafka producer.
* @param partitioner Partitioner to select Kafka partition for each item
* @deprecated Use table descriptors instead of implementation-specific classes.
*/
@Deprecated
public KafkaTableSinkBase(
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner) {
this.schema = Optional.empty();
this.topic = Preconditions.checkNotNull(topic, "topic");
this.properties = Preconditions.checkNotNull(properties, "properties");
this.partitioner = Optional.of(Preconditions.checkNotNull(partitioner, "partitioner"));
this.serializationSchema = Optional.empty();
this.serializationSchema = Preconditions.checkNotNull(serializationSchema, "Serialization schema must not be null.");
}
/**
......@@ -115,72 +88,39 @@ public abstract class KafkaTableSinkBase implements AppendStreamTableSink<Row> {
SerializationSchema<Row> serializationSchema,
Optional<FlinkKafkaPartitioner<Row>> partitioner);
/**
* Create serialization schema for converting table rows into bytes.
*
* @param rowSchema the schema of the row to serialize.
* @return Instance of serialization schema
* @deprecated Use the constructor to pass a serialization schema instead.
*/
@Deprecated
protected SerializationSchema<Row> createSerializationSchema(RowTypeInfo rowSchema) {
throw new UnsupportedOperationException("This method only exists for backwards compatibility.");
}
/**
* Create a deep copy of this sink.
*
* @return Deep copy of this sink
*/
@Deprecated
protected KafkaTableSinkBase createCopy() {
throw new UnsupportedOperationException("This method only exists for backwards compatibility.");
}
@Override
public void emitDataStream(DataStream<Row> dataStream) {
SinkFunction<Row> kafkaProducer = createKafkaProducer(
final SinkFunction<Row> kafkaProducer = createKafkaProducer(
topic,
properties,
serializationSchema.orElseThrow(() -> new IllegalStateException("No serialization schema defined.")),
serializationSchema,
partitioner);
dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), fieldNames));
dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(), getFieldNames()));
}
@Override
public TypeInformation<Row> getOutputType() {
return schema
.map(TableSchema::toRowType)
.orElseGet(() -> new RowTypeInfo(getFieldTypes()));
return schema.toRowType();
}
@Override
public String[] getFieldNames() {
return schema.map(TableSchema::getFieldNames).orElse(fieldNames);
return schema.getFieldNames();
}
@Override
public TypeInformation<?>[] getFieldTypes() {
return schema.map(TableSchema::getFieldTypes).orElse(fieldTypes);
return schema.getFieldTypes();
}
@Override
public KafkaTableSinkBase configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) {
if (schema.isPresent()) {
// a fixed schema is defined so reconfiguration is not supported
throw new UnsupportedOperationException("Reconfiguration of this sink is not supported.");
if (!Arrays.equals(getFieldNames(), fieldNames) || !Arrays.equals(getFieldTypes(), fieldTypes)) {
throw new ValidationException("Reconfiguration with different fields is not allowed. " +
"Expected: " + Arrays.toString(getFieldNames()) + " / " + Arrays.toString(getFieldTypes()) + ". " +
"But was: " + Arrays.toString(fieldNames) + " / " + Arrays.toString(fieldTypes));
}
// legacy code
KafkaTableSinkBase copy = createCopy();
copy.fieldNames = Preconditions.checkNotNull(fieldNames, "fieldNames");
copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes");
Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
"Number of provided field names and types does not match.");
RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames);
copy.serializationSchema = Optional.of(createSerializationSchema(rowSchema));
return copy;
return this;
}
@Override
......@@ -191,26 +131,21 @@ public abstract class KafkaTableSinkBase implements AppendStreamTableSink<Row> {
if (o == null || getClass() != o.getClass()) {
return false;
}
KafkaTableSinkBase that = (KafkaTableSinkBase) o;
final KafkaTableSinkBase that = (KafkaTableSinkBase) o;
return Objects.equals(schema, that.schema) &&
Objects.equals(topic, that.topic) &&
Objects.equals(properties, that.properties) &&
Objects.equals(serializationSchema, that.serializationSchema) &&
Objects.equals(partitioner, that.partitioner) &&
Arrays.equals(fieldNames, that.fieldNames) &&
Arrays.equals(fieldTypes, that.fieldTypes);
Objects.equals(partitioner, that.partitioner);
}
@Override
public int hashCode() {
int result = Objects.hash(
return Objects.hash(
schema,
topic,
properties,
serializationSchema,
partitioner);
result = 31 * result + Arrays.hashCode(fieldNames);
result = 31 * result + Arrays.hashCode(fieldTypes);
return result;
}
}
......@@ -28,15 +28,11 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.sources.DefinedFieldMapping;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp;
import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy;
import org.apache.flink.table.util.TableConnectorUtil;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
......@@ -62,19 +58,18 @@ public abstract class KafkaTableSourceBase implements
DefinedFieldMapping {
// common table source attributes
// TODO make all attributes final once we drop support for format-specific table sources
/** The schema of the table. */
private final TableSchema schema;
/** Field name of the processing time attribute, null if no processing time field is defined. */
private Optional<String> proctimeAttribute;
private final Optional<String> proctimeAttribute;
/** Descriptor for a rowtime attribute. */
private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
private final List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
/** Mapping for the fields of the table schema to fields of the physical returned type. */
private Optional<Map<String, String>> fieldMapping;
private final Optional<Map<String, String>> fieldMapping;
// Kafka-specific attributes
......@@ -88,10 +83,10 @@ public abstract class KafkaTableSourceBase implements
private final DeserializationSchema<Row> deserializationSchema;
/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
private StartupMode startupMode;
private final StartupMode startupMode;
/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
private Map<KafkaTopicPartition, Long> specificStartupOffsets;
private final Map<KafkaTopicPartition, Long> specificStartupOffsets;
/**
* Creates a generic Kafka {@link StreamTableSource}.
......@@ -221,9 +216,7 @@ public abstract class KafkaTableSourceBase implements
if (this == o) {
return true;
}
// TODO force classes to be equal once we drop support for format-specific table sources
// if (o == null || getClass() != o.getClass()) {
if (o == null || !(o instanceof KafkaTableSourceBase)) {
if (o == null || getClass() != o.getClass()) {
return false;
}
final KafkaTableSourceBase that = (KafkaTableSourceBase) o;
......@@ -323,63 +316,6 @@ public abstract class KafkaTableSourceBase implements
return rowtimeAttributeDescriptors;
}
//////// SETTERS FOR OPTIONAL PARAMETERS
/**
* Declares a field of the schema to be the processing time attribute.
*
* @param proctimeAttribute The name of the field that becomes the processing time field.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
protected void setProctimeAttribute(String proctimeAttribute) {
this.proctimeAttribute = validateProctimeAttribute(Optional.ofNullable(proctimeAttribute));
}
/**
* Declares a list of fields to be rowtime attributes.
*
* @param rowtimeAttributeDescriptors The descriptors of the rowtime attributes.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
protected void setRowtimeAttributeDescriptors(List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors) {
this.rowtimeAttributeDescriptors = validateRowtimeAttributeDescriptors(rowtimeAttributeDescriptors);
}
/**
* Sets the startup mode of the TableSource.
*
* @param startupMode The startup mode.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
protected void setStartupMode(StartupMode startupMode) {
this.startupMode = Preconditions.checkNotNull(startupMode);
}
/**
* Sets the startup offsets of the TableSource; only relevant when the startup mode is {@link StartupMode#SPECIFIC_OFFSETS}.
*
* @param specificStartupOffsets The startup offsets for different partitions.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
protected void setSpecificStartupOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
this.specificStartupOffsets = Preconditions.checkNotNull(specificStartupOffsets);
}
/**
* Mapping for the fields of the table schema to fields of the physical returned type.
*
* @param fieldMapping The mapping from table schema fields to format schema fields.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
protected void setFieldMapping(Map<String, String> fieldMapping) {
this.fieldMapping = Optional.ofNullable(fieldMapping);
}
//////// ABSTRACT METHODS FOR SUBCLASSES
/**
......@@ -395,296 +331,4 @@ public abstract class KafkaTableSourceBase implements
Properties properties,
DeserializationSchema<Row> deserializationSchema);
/**
* Abstract builder for a {@link KafkaTableSourceBase} to be extended by builders of subclasses of
* KafkaTableSourceBase.
*
* @param <T> Type of the KafkaTableSourceBase produced by the builder.
* @param <B> Type of the KafkaTableSourceBase.Builder subclass.
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
protected abstract static class Builder<T extends KafkaTableSourceBase, B extends KafkaTableSourceBase.Builder> {
private String topic;
private Properties kafkaProps;
private TableSchema schema;
private String proctimeAttribute;
private RowtimeAttributeDescriptor rowtimeAttributeDescriptor;
/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
private Map<KafkaTopicPartition, Long> specificStartupOffsets = null;
/**
* Sets the topic from which the table is read.
*
* @param topic The topic from which the table is read.
* @return The builder.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public B forTopic(String topic) {
Preconditions.checkNotNull(topic, "Topic must not be null.");
Preconditions.checkArgument(this.topic == null, "Topic has already been set.");
this.topic = topic;
return builder();
}
/**
* Sets the configuration properties for the Kafka consumer.
*
* @param props The configuration properties for the Kafka consumer.
* @return The builder.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public B withKafkaProperties(Properties props) {
Preconditions.checkNotNull(props, "Properties must not be null.");
Preconditions.checkArgument(this.kafkaProps == null, "Properties have already been set.");
this.kafkaProps = props;
return builder();
}
/**
* Sets the schema of the produced table.
*
* @param schema The schema of the produced table.
* @return The builder.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public B withSchema(TableSchema schema) {
Preconditions.checkNotNull(schema, "Schema must not be null.");
Preconditions.checkArgument(this.schema == null, "Schema has already been set.");
this.schema = schema;
return builder();
}
/**
* Configures a field of the table to be a processing time attribute.
* The configured field must be present in the table schema and of type {@link Types#SQL_TIMESTAMP()}.
*
* @param proctimeAttribute The name of the processing time attribute in the table schema.
* @return The builder.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public B withProctimeAttribute(String proctimeAttribute) {
Preconditions.checkNotNull(proctimeAttribute, "Proctime attribute must not be null.");
Preconditions.checkArgument(!proctimeAttribute.isEmpty(), "Proctime attribute must not be empty.");
Preconditions.checkArgument(this.proctimeAttribute == null, "Proctime attribute has already been set.");
this.proctimeAttribute = proctimeAttribute;
return builder();
}
/**
* Configures a field of the table to be a rowtime attribute.
* The configured field must be present in the table schema and of type {@link Types#SQL_TIMESTAMP()}.
*
* @param rowtimeAttribute The name of the rowtime attribute in the table schema.
* @param timestampExtractor The {@link TimestampExtractor} to extract the rowtime attribute from the physical type.
* @param watermarkStrategy The {@link WatermarkStrategy} to generate watermarks for the rowtime attribute.
* @return The builder.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public B withRowtimeAttribute(
String rowtimeAttribute,
TimestampExtractor timestampExtractor,
WatermarkStrategy watermarkStrategy) {
Preconditions.checkNotNull(rowtimeAttribute, "Rowtime attribute must not be null.");
Preconditions.checkArgument(!rowtimeAttribute.isEmpty(), "Rowtime attribute must not be empty.");
Preconditions.checkNotNull(timestampExtractor, "Timestamp extractor must not be null.");
Preconditions.checkNotNull(watermarkStrategy, "Watermark assigner must not be null.");
Preconditions.checkArgument(this.rowtimeAttributeDescriptor == null,
"Currently, only one rowtime attribute is supported.");
this.rowtimeAttributeDescriptor = new RowtimeAttributeDescriptor(
rowtimeAttribute,
timestampExtractor,
watermarkStrategy);
return builder();
}
/**
* Configures the Kafka timestamp to be a rowtime attribute.
*
* <p>Note: Kafka supports message timestamps only since version 0.10.</p>
*
* @param rowtimeAttribute The name of the rowtime attribute in the table schema.
* @param watermarkStrategy The {@link WatermarkStrategy} to generate watermarks for the rowtime attribute.
* @return The builder.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public B withKafkaTimestampAsRowtimeAttribute(
String rowtimeAttribute,
WatermarkStrategy watermarkStrategy) {
Preconditions.checkNotNull(rowtimeAttribute, "Rowtime attribute must not be null.");
Preconditions.checkArgument(!rowtimeAttribute.isEmpty(), "Rowtime attribute must not be empty.");
Preconditions.checkNotNull(watermarkStrategy, "Watermark assigner must not be null.");
Preconditions.checkArgument(this.rowtimeAttributeDescriptor == null,
"Currently, only one rowtime attribute is supported.");
Preconditions.checkArgument(supportsKafkaTimestamps(), "Kafka timestamps are only supported since Kafka 0.10.");
this.rowtimeAttributeDescriptor = new RowtimeAttributeDescriptor(
rowtimeAttribute,
new StreamRecordTimestamp(),
watermarkStrategy);
return builder();
}
/**
* Configures the TableSource to start reading from the earliest offset for all partitions.
*
* @see FlinkKafkaConsumerBase#setStartFromEarliest()
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public B fromEarliest() {
this.startupMode = StartupMode.EARLIEST;
this.specificStartupOffsets = null;
return builder();
}
/**
* Configures the TableSource to start reading from the latest offset for all partitions.
*
* @see FlinkKafkaConsumerBase#setStartFromLatest()
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public B fromLatest() {
this.startupMode = StartupMode.LATEST;
this.specificStartupOffsets = null;
return builder();
}
/**
* Configures the TableSource to start reading from any committed group offsets found in Zookeeper / Kafka brokers.
*
* @see FlinkKafkaConsumerBase#setStartFromGroupOffsets()
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public B fromGroupOffsets() {
this.startupMode = StartupMode.GROUP_OFFSETS;
this.specificStartupOffsets = null;
return builder();
}
/**
* Configures the TableSource to start reading partitions from specific offsets, set independently for each partition.
*
* @param specificStartupOffsets the specified offsets for partitions
* @see FlinkKafkaConsumerBase#setStartFromSpecificOffsets(Map)
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
public B fromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
this.startupMode = StartupMode.SPECIFIC_OFFSETS;
this.specificStartupOffsets = Preconditions.checkNotNull(specificStartupOffsets);
return builder();
}
/**
* Returns the configured topic.
*
* @return the configured topic.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
protected String getTopic() {
return this.topic;
}
/**
* Returns the configured Kafka properties.
*
* @return the configured Kafka properties.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
protected Properties getKafkaProps() {
return this.kafkaProps;
}
/**
* Returns the configured table schema.
*
* @return the configured table schema.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
protected TableSchema getTableSchema() {
return this.schema;
}
/**
* True if the KafkaSource supports Kafka timestamps, false otherwise.
*
* @return True if the KafkaSource supports Kafka timestamps, false otherwise.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
protected abstract boolean supportsKafkaTimestamps();
/**
* Configures a TableSource with optional parameters.
*
* @param tableSource The TableSource to configure.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
protected void configureTableSource(T tableSource) {
// configure processing time attributes
tableSource.setProctimeAttribute(proctimeAttribute);
// configure rowtime attributes
if (rowtimeAttributeDescriptor == null) {
tableSource.setRowtimeAttributeDescriptors(Collections.emptyList());
} else {
tableSource.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
}
tableSource.setStartupMode(startupMode);
switch (startupMode) {
case EARLIEST:
case LATEST:
case GROUP_OFFSETS:
break;
case SPECIFIC_OFFSETS:
tableSource.setSpecificStartupOffsets(specificStartupOffsets);
break;
}
}
/**
* Returns the builder.
* @return the builder.
* @deprecated Use the {@link org.apache.flink.table.descriptors.Kafka} descriptor together
* with descriptors for schema and format instead. Descriptors allow for
* implementation-agnostic definition of tables. See also
* {@link org.apache.flink.table.api.TableEnvironment#connect(ConnectorDescriptor)}.
*/
@Deprecated
protected abstract B builder();
/**
* Builds the configured {@link KafkaTableSourceBase}.
* @return The configured {@link KafkaTableSourceBase}.
* @deprecated Use table descriptors instead of implementation-specific builders.
*/
@Deprecated
protected abstract KafkaTableSourceBase build();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.util.serialization;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
/**
* DeserializationSchema that deserializes a JSON String into an ObjectNode.
*
* <p>Fields can be accessed by calling objectNode.get(&lt;name>).as(&lt;type>)
*
* @deprecated Please use {@link JsonNodeDeserializationSchema} in the "flink-json" module.
*/
@PublicEvolving
@Deprecated
public class JSONDeserializationSchema extends JsonNodeDeserializationSchema {
// delegate everything to the parent class
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.util.serialization;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
/**
* Deserialization schema from JSON to {@link Row}.
*
* <p>Deserializes the <code>byte[]</code> messages as a JSON object and reads
* the specified fields.
*
* <p>Failure during deserialization are forwarded as wrapped IOExceptions.
*
* @deprecated Please use {@link org.apache.flink.formats.json.JsonRowDeserializationSchema} in
* the "flink-json" module.
*/
@PublicEvolving
@Deprecated
public class JsonRowDeserializationSchema extends org.apache.flink.formats.json.JsonRowDeserializationSchema {
/**
* Creates a JSON deserialization schema for the given fields and types.
*
* @param typeInfo Type information describing the result type. The field names are used
* to parse the JSON file and so are the types.
*
* @deprecated Please use {@link org.apache.flink.formats.json.JsonRowDeserializationSchema} in
* the "flink-json" module.
*/
@Deprecated
public JsonRowDeserializationSchema(TypeInformation<Row> typeInfo) {
super(typeInfo);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.util.serialization;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Row;
/**
* Serialization schema that serializes an object into a JSON bytes.
*
* <p>Serializes the input {@link Row} object into a JSON string and
* converts it into <code>byte[]</code>.
*
* <p>Result <code>byte[]</code> messages can be deserialized using
* {@link JsonRowDeserializationSchema}.
*
* @deprecated Please use {@link org.apache.flink.formats.json.JsonRowSerializationSchema} in
* the "flink-json" module.
*/
@PublicEvolving
@Deprecated
public class JsonRowSerializationSchema extends org.apache.flink.formats.json.JsonRowSerializationSchema {
/**
* Creates a JSON serialization schema for the given fields and types.
*
* @param typeInfo The schema of the rows to encode.
*
* @deprecated Please use {@link org.apache.flink.formats.json.JsonRowSerializationSchema} in
* the "flink-json" module.
*/
@Deprecated
public JsonRowSerializationSchema(TypeInformation<Row> typeInfo) {
super(typeInfo);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.formats.avro.generated.DifferentSchemaRecord;
import org.apache.flink.formats.avro.generated.SchemaRecord;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Types;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
/**
* Abstract test base for all Kafka Avro table sources.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public abstract class KafkaAvroTableSourceTestBase extends KafkaTableSourceBuilderTestBase {
@Override
protected void configureBuilder(KafkaTableSourceBase.Builder builder) {
super.configureBuilder(builder);
((KafkaAvroTableSource.Builder) builder).forAvroRecordClass(SchemaRecord.class);
}
@Test
public void testSameFieldsAvroClass() {
KafkaAvroTableSource.Builder b = (KafkaAvroTableSource.Builder) getBuilder();
this.configureBuilder(b);
KafkaAvroTableSource source = (KafkaAvroTableSource) b.build();
// check return type
RowTypeInfo returnType = (RowTypeInfo) source.getReturnType();
assertNotNull(returnType);
assertEquals(5, returnType.getArity());
// check field names
assertEquals("field1", returnType.getFieldNames()[0]);
assertEquals("field2", returnType.getFieldNames()[1]);
assertEquals("time1", returnType.getFieldNames()[2]);
assertEquals("time2", returnType.getFieldNames()[3]);
assertEquals("field3", returnType.getFieldNames()[4]);
// check field types
assertEquals(Types.LONG(), returnType.getTypeAt(0));
assertEquals(Types.STRING(), returnType.getTypeAt(1));
assertEquals(Types.LONG(), returnType.getTypeAt(2));
assertEquals(Types.LONG(), returnType.getTypeAt(3));
assertEquals(Types.DOUBLE(), returnType.getTypeAt(4));
// check field mapping
assertNull(source.getFieldMapping());
// check if DataStream type matches with TableSource.getReturnType()
assertEquals(source.getReturnType(),
source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType());
}
@Test
public void testDifferentFieldsAvroClass() {
KafkaAvroTableSource.Builder b = (KafkaAvroTableSource.Builder) getBuilder();
super.configureBuilder(b);
b.withProctimeAttribute("time2");
Map<String, String> mapping = new HashMap<>();
mapping.put("field1", "otherField1");
mapping.put("field2", "otherField2");
mapping.put("field3", "otherField3");
// set Avro class with different fields
b.forAvroRecordClass(DifferentSchemaRecord.class);
b.withTableToAvroMapping(mapping);
KafkaAvroTableSource source = (KafkaAvroTableSource) b.build();
// check return type
RowTypeInfo returnType = (RowTypeInfo) source.getReturnType();
assertNotNull(returnType);
assertEquals(6, returnType.getArity());
// check field names
assertEquals("otherField1", returnType.getFieldNames()[0]);
assertEquals("otherField2", returnType.getFieldNames()[1]);
assertEquals("otherTime1", returnType.getFieldNames()[2]);
assertEquals("otherField3", returnType.getFieldNames()[3]);
assertEquals("otherField4", returnType.getFieldNames()[4]);
assertEquals("otherField5", returnType.getFieldNames()[5]);
// check field types
assertEquals(Types.LONG(), returnType.getTypeAt(0));
assertEquals(Types.STRING(), returnType.getTypeAt(1));
assertEquals(Types.LONG(), returnType.getTypeAt(2));
assertEquals(Types.DOUBLE(), returnType.getTypeAt(3));
assertEquals(Types.FLOAT(), returnType.getTypeAt(4));
assertEquals(Types.INT(), returnType.getTypeAt(5));
// check field mapping
Map<String, String> fieldMapping = source.getFieldMapping();
assertNotNull(fieldMapping);
assertEquals(3, fieldMapping.size());
assertEquals("otherField1", fieldMapping.get("field1"));
assertEquals("otherField2", fieldMapping.get("field2"));
assertEquals("otherField3", fieldMapping.get("field3"));
// check if DataStream type matches with TableSource.getReturnType()
assertEquals(source.getReturnType(),
source.getDataStream(StreamExecutionEnvironment.getExecutionEnvironment()).getType());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.formats.json.JsonRowSchemaConverter;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.descriptors.FormatDescriptor;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.TestTableDescriptor;
import org.apache.flink.table.factories.StreamTableSourceFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.TableSourceUtil;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
/**
* Tests for legacy KafkaJsonTableSourceFactory.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public abstract class KafkaJsonTableSourceFactoryTestBase {
private static final String JSON_SCHEMA =
"{" +
" 'title': 'Fruit'," +
" 'type': 'object'," +
" 'properties': {" +
" 'name': {" +
" 'type': 'string'" +
" }," +
" 'count': {" +
" 'type': 'integer'" +
" }," +
" 'time': {" +
" 'description': 'row time'," +
" 'type': 'string'," +
" 'format': 'date-time'" +
" }" +
" }," +
" 'required': ['name', 'count', 'time']" +
"}";
private static final String TOPIC = "test-topic";
protected abstract String version();
protected abstract KafkaJsonTableSource.Builder builder();
@Test
public void testTableSourceFromJsonSchema() {
testTableSource(
new Json()
.jsonSchema(JSON_SCHEMA)
.failOnMissingField(true)
);
}
@Test
public void testTableSourceDerivedSchema() {
testTableSource(
new Json()
.deriveSchema()
.failOnMissingField(true)
);
}
private void testTableSource(FormatDescriptor format) {
// construct table source using a builder
final Map<String, String> tableJsonMapping = new HashMap<>();
tableJsonMapping.put("fruit-name", "name");
tableJsonMapping.put("name", "name");
tableJsonMapping.put("count", "count");
tableJsonMapping.put("time", "time");
final Properties props = new Properties();
props.put("group.id", "test-group");
props.put("bootstrap.servers", "localhost:1234");
final Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
specificOffsets.put(new KafkaTopicPartition(TOPIC, 0), 100L);
specificOffsets.put(new KafkaTopicPartition(TOPIC, 1), 123L);
final KafkaTableSourceBase builderSource = builder()
.forJsonSchema(TableSchema.fromTypeInfo(JsonRowSchemaConverter.convert(JSON_SCHEMA)))
.failOnMissingField(true)
.withTableToJsonMapping(tableJsonMapping)
.withKafkaProperties(props)
.forTopic(TOPIC)
.fromSpecificOffsets(specificOffsets)
.withSchema(
TableSchema.builder()
.field("fruit-name", Types.STRING)
.field("count", Types.BIG_DEC)
.field("event-time", Types.SQL_TIMESTAMP)
.field("proc-time", Types.SQL_TIMESTAMP)
.build())
.withProctimeAttribute("proc-time")
.withRowtimeAttribute("event-time", new ExistingField("time"), new AscendingTimestamps())
.build();
TableSourceUtil.validateTableSource(builderSource);
// construct table source using descriptors and table source factory
final Map<Integer, Long> offsets = new HashMap<>();
offsets.put(0, 100L);
offsets.put(1, 123L);
final TestTableDescriptor testDesc = new TestTableDescriptor(
new Kafka()
.version(version())
.topic(TOPIC)
.properties(props)
.startFromSpecificOffsets(offsets))
.withFormat(format)
.withSchema(
new Schema()
.field("fruit-name", Types.STRING).from("name")
.field("count", Types.BIG_DEC) // no from so it must match with the input
.field("event-time", Types.SQL_TIMESTAMP).rowtime(
new Rowtime().timestampsFromField("time").watermarksPeriodicAscending())
.field("proc-time", Types.SQL_TIMESTAMP).proctime())
.inAppendMode();
final Map<String, String> properties = testDesc.toProperties();
final TableSource<?> factorySource =
TableFactoryService.find(StreamTableSourceFactory.class, properties)
.createStreamTableSource(properties);
assertEquals(builderSource, factorySource);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
/**
* Abstract test base for all Kafka JSON table sources.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public abstract class KafkaJsonTableSourceTestBase extends KafkaTableSourceBuilderTestBase {
@Test
public void testJsonEqualsTableSchema() {
KafkaJsonTableSource.Builder b = (KafkaJsonTableSource.Builder) getBuilder();
this.configureBuilder(b);
KafkaJsonTableSource source = (KafkaJsonTableSource) b.build();
// check return type
RowTypeInfo returnType = (RowTypeInfo) source.getReturnType();
assertNotNull(returnType);
assertEquals(5, returnType.getArity());
// check field names
assertEquals("field1", returnType.getFieldNames()[0]);
assertEquals("field2", returnType.getFieldNames()[1]);
assertEquals("time1", returnType.getFieldNames()[2]);
assertEquals("time2", returnType.getFieldNames()[3]);
assertEquals("field3", returnType.getFieldNames()[4]);
// check field types
assertEquals(Types.LONG(), returnType.getTypeAt(0));
assertEquals(Types.STRING(), returnType.getTypeAt(1));
assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(2));
assertEquals(Types.SQL_TIMESTAMP(), returnType.getTypeAt(3));
assertEquals(Types.DOUBLE(), returnType.getTypeAt(4));
// check field mapping
assertNull(source.getFieldMapping());
}
@Test
public void testCustomJsonSchemaWithMapping() {
KafkaJsonTableSource.Builder b = (KafkaJsonTableSource.Builder) getBuilder();
super.configureBuilder(b);
b.withProctimeAttribute("time2");
Map<String, String> mapping = new HashMap<>();
mapping.put("field1", "otherField1");
mapping.put("field2", "otherField2");
mapping.put("field3", "otherField3");
// set Avro class with different fields
b.forJsonSchema(TableSchema.builder()
.field("otherField1", Types.LONG())
.field("otherField2", Types.STRING())
.field("rowtime", Types.LONG())
.field("otherField3", Types.DOUBLE())
.field("otherField4", Types.BYTE())
.field("otherField5", Types.INT()).build());
b.withTableToJsonMapping(mapping);
b.withRowtimeAttribute("time1", new ExistingField("timeField1"), new AscendingTimestamps());
KafkaJsonTableSource source = (KafkaJsonTableSource) b.build();
// check return type
RowTypeInfo returnType = (RowTypeInfo) source.getReturnType();
assertNotNull(returnType);
assertEquals(6, returnType.getArity());
// check field names
assertEquals("otherField1", returnType.getFieldNames()[0]);
assertEquals("otherField2", returnType.getFieldNames()[1]);
assertEquals("rowtime", returnType.getFieldNames()[2]);
assertEquals("otherField3", returnType.getFieldNames()[3]);
assertEquals("otherField4", returnType.getFieldNames()[4]);
assertEquals("otherField5", returnType.getFieldNames()[5]);
// check field types
assertEquals(Types.LONG(), returnType.getTypeAt(0));
assertEquals(Types.STRING(), returnType.getTypeAt(1));
assertEquals(Types.LONG(), returnType.getTypeAt(2));
assertEquals(Types.DOUBLE(), returnType.getTypeAt(3));
assertEquals(Types.BYTE(), returnType.getTypeAt(4));
assertEquals(Types.INT(), returnType.getTypeAt(5));
// check field mapping
Map<String, String> fieldMapping = source.getFieldMapping();
assertNotNull(fieldMapping);
assertEquals(3, fieldMapping.size());
assertEquals("otherField1", fieldMapping.get("field1"));
assertEquals("otherField2", fieldMapping.get("field2"));
assertEquals("otherField3", fieldMapping.get("field3"));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.Types;
import org.apache.flink.types.Row;
import org.junit.Test;
import java.util.Optional;
import java.util.Properties;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Abstract test base for all Kafka table sink tests.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sinks.
*/
@Deprecated
public abstract class KafkaTableSinkBaseTestBase {
private static final String TOPIC = "testTopic";
private static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() };
private static final FlinkKafkaPartitioner<Row> PARTITIONER = new CustomPartitioner();
private static final Properties PROPERTIES = createSinkProperties();
@SuppressWarnings("unchecked")
@Test
public void testKafkaTableSink() {
DataStream dataStream = mock(DataStream.class);
when(dataStream.addSink(any(SinkFunction.class))).thenReturn(mock(DataStreamSink.class));
KafkaTableSinkBase kafkaTableSink = spy(createTableSink());
kafkaTableSink.emitDataStream(dataStream);
// verify correct producer class
verify(dataStream).addSink(any(getProducerClass()));
// verify correctly configured producer
verify(kafkaTableSink).createKafkaProducer(
eq(TOPIC),
eq(PROPERTIES),
any(getSerializationSchemaClass()),
eq(Optional.of(PARTITIONER)));
}
@Test
public void testConfiguration() {
KafkaTableSinkBase kafkaTableSink = createTableSink();
KafkaTableSinkBase newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES);
assertNotSame(kafkaTableSink, newKafkaTableSink);
assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames());
assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes());
assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
}
protected abstract KafkaTableSinkBase createTableSink(
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner);
protected abstract Class<? extends SerializationSchema<Row>> getSerializationSchemaClass();
protected abstract Class<? extends FlinkKafkaProducerBase> getProducerClass();
private KafkaTableSinkBase createTableSink() {
KafkaTableSinkBase sink = createTableSink(TOPIC, PROPERTIES, PARTITIONER);
return sink.configure(FIELD_NAMES, FIELD_TYPES);
}
private static Properties createSinkProperties() {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:12345");
return properties;
}
private static class CustomPartitioner extends FlinkKafkaPartitioner<Row> {
@Override
public int partition(Row record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
return 0;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.tsextractors.ExistingField;
import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp;
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps;
import org.apache.flink.types.Row;
import org.junit.Test;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Abstract test base for all format-specific Kafka table sources with builders.
*
* @deprecated Ensures backwards compatibility with Flink 1.5. Can be removed once we
* drop support for format-specific table sources.
*/
@Deprecated
public abstract class KafkaTableSourceBuilderTestBase {
static final String[] FIELD_NAMES =
new String[]{"field1", "field2", "time1", "time2", "field3"};
static final TypeInformation[] FIELD_TYPES =
new TypeInformation[]{Types.LONG(), Types.STRING(), Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP(), Types.DOUBLE()};
private static final String TOPIC = "testTopic";
private static final TableSchema SCHEMA = new TableSchema(FIELD_NAMES, FIELD_TYPES);
private static final Properties PROPS = createSourceProperties();
@Test
@SuppressWarnings("unchecked")
public void testKafkaConsumer() {
KafkaTableSourceBase.Builder b = getBuilder();
configureBuilder(b);
// assert that correct
KafkaTableSourceBase observed = spy(b.build());
StreamExecutionEnvironment env = mock(StreamExecutionEnvironment.class);
when(env.addSource(any(SourceFunction.class))).thenReturn(mock(DataStreamSource.class));
observed.getDataStream(env);
verify(env).addSource(any(getFlinkKafkaConsumer()));
verify(observed).getKafkaConsumer(
eq(TOPIC),
eq(PROPS),
any(getDeserializationSchema()));
}
@Test
public void testTableSchema() {
KafkaTableSourceBase.Builder b = getBuilder();
configureBuilder(b);
KafkaTableSourceBase source = b.build();
// check table schema
TableSchema schema = source.getTableSchema();
assertNotNull(schema);
assertEquals(5, schema.getFieldNames().length);
// check table fields
assertEquals("field1", schema.getFieldNames()[0]);
assertEquals("field2", schema.getFieldNames()[1]);
assertEquals("time1", schema.getFieldNames()[2]);
assertEquals("time2", schema.getFieldNames()[3]);
assertEquals("field3", schema.getFieldNames()[4]);
assertEquals(Types.LONG(), schema.getFieldTypes()[0]);
assertEquals(Types.STRING(), schema.getFieldTypes()[1]);
assertEquals(Types.SQL_TIMESTAMP(), schema.getFieldTypes()[2]);
assertEquals(Types.SQL_TIMESTAMP(), schema.getFieldTypes()[3]);
assertEquals(Types.DOUBLE(), schema.getFieldTypes()[4]);
}
@Test
public void testNoTimeAttributes() {
KafkaTableSourceBase.Builder b = getBuilder();
configureBuilder(b);
KafkaTableSourceBase source = b.build();
// assert no proctime
assertNull(source.getProctimeAttribute());
// assert no rowtime
assertNotNull(source.getRowtimeAttributeDescriptors());
assertTrue(source.getRowtimeAttributeDescriptors().isEmpty());
}
@Test
public void testProctimeAttribute() {
KafkaTableSourceBase.Builder b = getBuilder();
configureBuilder(b);
b.withProctimeAttribute("time1");
KafkaTableSourceBase source = b.build();
// assert correct proctime field
assertEquals(source.getProctimeAttribute(), "time1");
// assert no rowtime
assertNotNull(source.getRowtimeAttributeDescriptors());
assertTrue(source.getRowtimeAttributeDescriptors().isEmpty());
}
@Test
public void testRowtimeAttribute() {
KafkaTableSourceBase.Builder b = getBuilder();
configureBuilder(b);
b.withRowtimeAttribute("time2", new ExistingField("time2"), new AscendingTimestamps());
KafkaTableSourceBase source = b.build();
// assert no proctime
assertNull(source.getProctimeAttribute());
// assert correct rowtime descriptor
List<RowtimeAttributeDescriptor> descs = source.getRowtimeAttributeDescriptors();
assertNotNull(descs);
assertEquals(1, descs.size());
RowtimeAttributeDescriptor desc = descs.get(0);
assertEquals("time2", desc.getAttributeName());
// assert timestamp extractor
assertTrue(desc.getTimestampExtractor() instanceof ExistingField);
assertEquals(1, desc.getTimestampExtractor().getArgumentFields().length);
assertEquals("time2", desc.getTimestampExtractor().getArgumentFields()[0]);
// assert watermark strategy
assertTrue(desc.getWatermarkStrategy() instanceof AscendingTimestamps);
}
@Test
public void testRowtimeAttribute2() {
KafkaTableSourceBase.Builder b = getBuilder();
configureBuilder(b);
try {
b.withKafkaTimestampAsRowtimeAttribute("time2", new AscendingTimestamps());
KafkaTableSourceBase source = b.build();
// assert no proctime
assertNull(source.getProctimeAttribute());
// assert correct rowtime descriptor
List<RowtimeAttributeDescriptor> descs = source.getRowtimeAttributeDescriptors();
assertNotNull(descs);
assertEquals(1, descs.size());
RowtimeAttributeDescriptor desc = descs.get(0);
assertEquals("time2", desc.getAttributeName());
// assert timestamp extractor
assertTrue(desc.getTimestampExtractor() instanceof StreamRecordTimestamp);
assertTrue(desc.getTimestampExtractor().getArgumentFields().length == 0);
// assert watermark strategy
assertTrue(desc.getWatermarkStrategy() instanceof AscendingTimestamps);
} catch (Exception e) {
if (b.supportsKafkaTimestamps()) {
// builder should support Kafka timestamps
fail();
}
}
}
@Test
@SuppressWarnings("unchecked")
public void testConsumerOffsets() {
KafkaTableSourceBase.Builder b = getBuilder();
configureBuilder(b);
// test the default behavior
KafkaTableSourceBase source = spy(b.build());
when(source.createKafkaConsumer(TOPIC, PROPS, null))
.thenReturn(mock(getFlinkKafkaConsumer()));
verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromGroupOffsets();
// test reading from earliest
b.fromEarliest();
source = spy(b.build());
when(source.createKafkaConsumer(TOPIC, PROPS, null))
.thenReturn(mock(getFlinkKafkaConsumer()));
verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromEarliest();
// test reading from latest
b.fromLatest();
source = spy(b.build());
when(source.createKafkaConsumer(TOPIC, PROPS, null))
.thenReturn(mock(getFlinkKafkaConsumer()));
verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromLatest();
// test reading from group offsets
b.fromGroupOffsets();
source = spy(b.build());
when(source.createKafkaConsumer(TOPIC, PROPS, null))
.thenReturn(mock(getFlinkKafkaConsumer()));
verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromGroupOffsets();
// test reading from given offsets
b.fromSpecificOffsets(mock(Map.class));
source = spy(b.build());
when(source.createKafkaConsumer(TOPIC, PROPS, null))
.thenReturn(mock(getFlinkKafkaConsumer()));
verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromSpecificOffsets(any(Map.class));
}
protected abstract KafkaTableSourceBase.Builder getBuilder();
protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema();
protected abstract Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer();
protected void configureBuilder(KafkaTableSourceBase.Builder builder) {
builder
.forTopic(TOPIC)
.withKafkaProperties(PROPS)
.withSchema(SCHEMA);
}
private static Properties createSourceProperties() {
Properties properties = new Properties();
properties.setProperty("zookeeper.connect", "dummy");
properties.setProperty("group.id", "dummy");
return properties;
}
}
......@@ -85,22 +85,6 @@ under the License.
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-avro. -->
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<!-- Projects depending on this project, won't depend on flink-json. -->
<optional>true</optional>
</dependency>
<!-- test dependencies -->
<dependency>
......@@ -111,23 +95,6 @@ under the License.
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册