diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java index fd201e4996771884fdc3307504de0b80e8bde73e..a2ae56d23628e2d0119b042e768fc8307035f1d8 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java @@ -30,8 +30,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper; import org.apache.flink.util.SerializedValue; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -87,7 +86,7 @@ public class FlinkKafkaConsumer010 extends FlinkKafkaConsumer09 { /** * Creates a new Kafka streaming source consumer for Kafka 0.10.x * - *

This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + *

This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value * pairs, offsets, and topic names from Kafka. * * @param topic @@ -97,7 +96,7 @@ public class FlinkKafkaConsumer010 extends FlinkKafkaConsumer09 { * @param props * The properties used to configure the Kafka consumer client, and the ZooKeeper client. */ - public FlinkKafkaConsumer010(String topic, KeyedDeserializationSchema deserializer, Properties props) { + public FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema deserializer, Properties props) { this(Collections.singletonList(topic), deserializer, props); } @@ -114,7 +113,7 @@ public class FlinkKafkaConsumer010 extends FlinkKafkaConsumer09 { * The properties that are used to configure both the fetcher and the offset handler. */ public FlinkKafkaConsumer010(List topics, DeserializationSchema deserializer, Properties props) { - this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props); + this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props); } /** @@ -129,7 +128,7 @@ public class FlinkKafkaConsumer010 extends FlinkKafkaConsumer09 { * @param props * The properties that are used to configure both the fetcher and the offset handler. */ - public FlinkKafkaConsumer010(List topics, KeyedDeserializationSchema deserializer, Properties props) { + public FlinkKafkaConsumer010(List topics, KafkaDeserializationSchema deserializer, Properties props) { super(topics, deserializer, props); } @@ -150,7 +149,7 @@ public class FlinkKafkaConsumer010 extends FlinkKafkaConsumer09 { */ @PublicEvolving public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema valueDeserializer, Properties props) { - this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props); + this(subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props); } /** @@ -161,7 +160,7 @@ public class FlinkKafkaConsumer010 extends FlinkKafkaConsumer09 { * {@link FlinkKafkaConsumer010#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics * with names matching the pattern will also be subscribed to as they are created on the fly. * - *

This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + *

This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value * pairs, offsets, and topic names from Kafka. * * @param subscriptionPattern @@ -172,7 +171,7 @@ public class FlinkKafkaConsumer010 extends FlinkKafkaConsumer09 { * The properties used to configure the Kafka consumer client, and the ZooKeeper client. */ @PublicEvolving - public FlinkKafkaConsumer010(Pattern subscriptionPattern, KeyedDeserializationSchema deserializer, Properties props) { + public FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema deserializer, Properties props) { super(subscriptionPattern, deserializer, props); } diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java index 9b9b217b205f076f445e5d9541d7ba13f6478f8d..b0b27527adc8cb726d5ff7f793784894574f13dc 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java @@ -23,10 +23,10 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.util.SerializedValue; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -55,7 +55,7 @@ public class Kafka010Fetcher extends Kafka09Fetcher { long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, - KeyedDeserializationSchema deserializer, + KafkaDeserializationSchema deserializer, Properties kafkaProperties, long pollTimeout, MetricGroup subtaskMetricGroup, diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java index 45ab95842e2099a52d47cf9b4be2fb64b00312df..ace78693686c87f9b69b635cc21f9c3f26ded98b 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java @@ -36,9 +36,9 @@ import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Ignore; import org.junit.Test; @@ -310,7 +310,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase { } } - private static class LimitedLongDeserializer implements KeyedDeserializationSchema { + private static class LimitedLongDeserializer implements KafkaDeserializationSchema { private static final long serialVersionUID = 6966177118923713521L; private final TypeInformation ti; @@ -328,9 +328,9 @@ public class Kafka010ITCase extends KafkaConsumerTestBase { } @Override - public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + public Long deserialize(ConsumerRecord record) throws IOException { cnt++; - DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); + DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(record.value())); Long e = ser.deserialize(in); return e; } diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 72d9cb56330d7af7ad48655bd1558f31318bdce6..8c69f36356727de01fdff5b87f09fd41676cf9b2 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -23,7 +23,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.util.NetUtils; @@ -120,7 +119,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override - public FlinkKafkaConsumerBase getConsumer(List topics, KeyedDeserializationSchema readSchema, Properties props) { + public FlinkKafkaConsumerBase getConsumer(List topics, KafkaDeserializationSchema readSchema, Properties props) { return new FlinkKafkaConsumer010<>(topics, readSchema, props); } diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java index f57fbea67e1472c862f40f5d578a99298e79f502..8c8a55f2155285bd7b525f5439158b67466548bc 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java @@ -24,12 +24,12 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -115,7 +115,7 @@ public class Kafka010FetcherTest { SourceContext sourceContext = mock(SourceContext.class); Map partitionsWithInitialOffsets = Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); - KeyedDeserializationSchema schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + KafkaDeserializationSchema schema = new KafkaDeserializationSchemaWrapper<>(new SimpleStringSchema()); final Kafka010Fetcher fetcher = new Kafka010Fetcher<>( sourceContext, @@ -252,7 +252,7 @@ public class Kafka010FetcherTest { SourceContext sourceContext = mock(SourceContext.class); Map partitionsWithInitialOffsets = Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); - KeyedDeserializationSchema schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + KafkaDeserializationSchema schema = new KafkaDeserializationSchemaWrapper<>(new SimpleStringSchema()); final Kafka010Fetcher fetcher = new Kafka010Fetcher<>( sourceContext, @@ -367,7 +367,7 @@ public class Kafka010FetcherTest { BlockingSourceContext sourceContext = new BlockingSourceContext<>(); Map partitionsWithInitialOffsets = Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); - KeyedDeserializationSchema schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + KafkaDeserializationSchema schema = new KafkaDeserializationSchemaWrapper<>(new SimpleStringSchema()); final Kafka010Fetcher fetcher = new Kafka010Fetcher<>( sourceContext, diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java index 2b8d3f3795982266f51d61c7662fc90f7521cec2..4dd75b5a901bc01c44f9b0c1f5a2d1bc7e8efa45 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java @@ -19,8 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper; import java.util.Collections; import java.util.List; @@ -68,7 +67,7 @@ public class FlinkKafkaConsumer011 extends FlinkKafkaConsumer010 { /** * Creates a new Kafka streaming source consumer for Kafka 0.11.x * - *

This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + *

This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value * pairs, offsets, and topic names from Kafka. * * @param topic @@ -78,7 +77,7 @@ public class FlinkKafkaConsumer011 extends FlinkKafkaConsumer010 { * @param props * The properties used to configure the Kafka consumer client, and the ZooKeeper client. */ - public FlinkKafkaConsumer011(String topic, KeyedDeserializationSchema deserializer, Properties props) { + public FlinkKafkaConsumer011(String topic, KafkaDeserializationSchema deserializer, Properties props) { this(Collections.singletonList(topic), deserializer, props); } @@ -95,7 +94,7 @@ public class FlinkKafkaConsumer011 extends FlinkKafkaConsumer010 { * The properties that are used to configure both the fetcher and the offset handler. */ public FlinkKafkaConsumer011(List topics, DeserializationSchema deserializer, Properties props) { - this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props); + this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props); } /** @@ -110,7 +109,7 @@ public class FlinkKafkaConsumer011 extends FlinkKafkaConsumer010 { * @param props * The properties that are used to configure both the fetcher and the offset handler. */ - public FlinkKafkaConsumer011(List topics, KeyedDeserializationSchema deserializer, Properties props) { + public FlinkKafkaConsumer011(List topics, KafkaDeserializationSchema deserializer, Properties props) { super(topics, deserializer, props); } @@ -131,7 +130,7 @@ public class FlinkKafkaConsumer011 extends FlinkKafkaConsumer010 { */ @PublicEvolving public FlinkKafkaConsumer011(Pattern subscriptionPattern, DeserializationSchema valueDeserializer, Properties props) { - this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props); + this(subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props); } /** @@ -142,7 +141,7 @@ public class FlinkKafkaConsumer011 extends FlinkKafkaConsumer010 { * {@link FlinkKafkaConsumer011#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics * with names matching the pattern will also be subscribed to as they are created on the fly. * - *

This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + *

This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value * pairs, offsets, and topic names from Kafka. * * @param subscriptionPattern @@ -153,7 +152,7 @@ public class FlinkKafkaConsumer011 extends FlinkKafkaConsumer010 { * The properties used to configure the Kafka consumer client, and the ZooKeeper client. */ @PublicEvolving - public FlinkKafkaConsumer011(Pattern subscriptionPattern, KeyedDeserializationSchema deserializer, Properties props) { + public FlinkKafkaConsumer011(Pattern subscriptionPattern, KafkaDeserializationSchema deserializer, Properties props) { super(subscriptionPattern, deserializer, props); } } diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java index 8692b5a6963f696641142149473c210c1a2f56a5..77ae08ee337368f3b012395af7ac972811bd5f67 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java @@ -36,9 +36,9 @@ import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.BeforeClass; import org.junit.Test; @@ -319,7 +319,7 @@ public class Kafka011ITCase extends KafkaConsumerTestBase { } } - private static class LimitedLongDeserializer implements KeyedDeserializationSchema { + private static class LimitedLongDeserializer implements KafkaDeserializationSchema { private static final long serialVersionUID = 6966177118923713521L; private final TypeInformation ti; @@ -337,9 +337,9 @@ public class Kafka011ITCase extends KafkaConsumerTestBase { } @Override - public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + public Long deserialize(ConsumerRecord record) throws IOException { cnt++; - DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); + DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(record.value())); Long e = ser.deserialize(in); return e; } @@ -349,5 +349,4 @@ public class Kafka011ITCase extends KafkaConsumerTestBase { return cnt > 1110L; } } - } diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index e81148b691ccf1b11d8bd97bfbff3fc38c3bd9e7..40728baac2359ba82286dc88374ce1cf5933556c 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -24,7 +24,6 @@ import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.util.NetUtils; @@ -127,7 +126,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override - public FlinkKafkaConsumerBase getConsumer(List topics, KeyedDeserializationSchema readSchema, Properties props) { + public FlinkKafkaConsumerBase getConsumer(List topics, KafkaDeserializationSchema readSchema, Properties props) { return new FlinkKafkaConsumer011<>(topics, readSchema, props); } diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java index 86f69cd3f6e2b4d65cb8b2684c5da2ab04c6a584..e88bdc4363d9277e473f23ab0537eb62c1bde999 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -30,8 +30,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher; import org.apache.flink.streaming.connectors.kafka.internals.Kafka08PartitionDiscoverer; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper; import org.apache.flink.util.PropertiesUtil; import org.apache.flink.util.SerializedValue; @@ -119,7 +118,7 @@ public class FlinkKafkaConsumer08 extends FlinkKafkaConsumerBase { /** * Creates a new Kafka streaming source consumer for Kafka 0.8.x * - *

This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + *

This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value * pairs, offsets, and topic names from Kafka. * * @param topic @@ -129,7 +128,7 @@ public class FlinkKafkaConsumer08 extends FlinkKafkaConsumerBase { * @param props * The properties used to configure the Kafka consumer client, and the ZooKeeper client. */ - public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema deserializer, Properties props) { + public FlinkKafkaConsumer08(String topic, KafkaDeserializationSchema deserializer, Properties props) { this(Collections.singletonList(topic), deserializer, props); } @@ -146,7 +145,7 @@ public class FlinkKafkaConsumer08 extends FlinkKafkaConsumerBase { * The properties that are used to configure both the fetcher and the offset handler. */ public FlinkKafkaConsumer08(List topics, DeserializationSchema deserializer, Properties props) { - this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props); + this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props); } /** @@ -161,7 +160,7 @@ public class FlinkKafkaConsumer08 extends FlinkKafkaConsumerBase { * @param props * The properties that are used to configure both the fetcher and the offset handler. */ - public FlinkKafkaConsumer08(List topics, KeyedDeserializationSchema deserializer, Properties props) { + public FlinkKafkaConsumer08(List topics, KafkaDeserializationSchema deserializer, Properties props) { this(topics, null, deserializer, props); } @@ -182,7 +181,7 @@ public class FlinkKafkaConsumer08 extends FlinkKafkaConsumerBase { */ @PublicEvolving public FlinkKafkaConsumer08(Pattern subscriptionPattern, DeserializationSchema valueDeserializer, Properties props) { - this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props); + this(subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props); } /** @@ -193,7 +192,7 @@ public class FlinkKafkaConsumer08 extends FlinkKafkaConsumerBase { * {@link FlinkKafkaConsumer08#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics * with names matching the pattern will also be subscribed to as they are created on the fly. * - *

This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + *

This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value * pairs, offsets, and topic names from Kafka. * * @param subscriptionPattern @@ -204,14 +203,14 @@ public class FlinkKafkaConsumer08 extends FlinkKafkaConsumerBase { * The properties used to configure the Kafka consumer client, and the ZooKeeper client. */ @PublicEvolving - public FlinkKafkaConsumer08(Pattern subscriptionPattern, KeyedDeserializationSchema deserializer, Properties props) { + public FlinkKafkaConsumer08(Pattern subscriptionPattern, KafkaDeserializationSchema deserializer, Properties props) { this(null, subscriptionPattern, deserializer, props); } private FlinkKafkaConsumer08( List topics, Pattern subscriptionPattern, - KeyedDeserializationSchema deserializer, + KafkaDeserializationSchema deserializer, Properties props) { super( diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java index 96540412f0cec962261325bb014dff32667cf105..74df5070617ce8099b330488d8edf745f37b77fa 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java @@ -25,8 +25,8 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.SerializedValue; @@ -68,7 +68,7 @@ public class Kafka08Fetcher extends AbstractFetcher { // ------------------------------------------------------------------------ /** The schema to convert between Kafka's byte messages, and Flink's objects. */ - private final KeyedDeserializationSchema deserializer; + private final KafkaDeserializationSchema deserializer; /** The properties that configure the Kafka connection. */ private final Properties kafkaConfig; @@ -94,7 +94,7 @@ public class Kafka08Fetcher extends AbstractFetcher { SerializedValue> watermarksPeriodic, SerializedValue> watermarksPunctuated, StreamingRuntimeContext runtimeContext, - KeyedDeserializationSchema deserializer, + KafkaDeserializationSchema deserializer, Properties kafkaProperties, long autoCommitInterval, MetricGroup consumerMetricGroup, @@ -387,7 +387,7 @@ public class Kafka08Fetcher extends AbstractFetcher { ExceptionProxy errorHandler) throws IOException, ClassNotFoundException { // each thread needs its own copy of the deserializer, because the deserializer is // not necessarily thread safe - final KeyedDeserializationSchema clonedDeserializer = + final KafkaDeserializationSchema clonedDeserializer = InstantiationUtil.clone(deserializer, runtimeContext.getUserCodeClassLoader()); // seed thread with list of fetch partitions (otherwise it would shut down immediately again diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java index 1fdff9d9c8d36d893b199f8422752f9a6565f2cc..37cf39b8bc0590a866ded8e98a7042fac7fcfbc1 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java @@ -19,7 +19,7 @@ package org.apache.flink.streaming.connectors.kafka.internals; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.util.ExceptionUtils; import kafka.api.FetchRequestBuilder; @@ -32,6 +32,7 @@ import kafka.javaapi.OffsetResponse; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.MessageAndOffset; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.Node; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +69,7 @@ class SimpleConsumerThread extends Thread { private final Kafka08Fetcher owner; - private final KeyedDeserializationSchema deserializer; + private final KafkaDeserializationSchema deserializer; private final List> partitions; @@ -104,7 +105,7 @@ class SimpleConsumerThread extends Thread { Node broker, List> seedPartitions, ClosableBlockingQueue> unassignedPartitions, - KeyedDeserializationSchema deserializer, + KafkaDeserializationSchema deserializer, long invalidOffsetBehavior) { this.owner = owner; this.errorHandler = errorHandler; @@ -370,8 +371,10 @@ class SimpleConsumerThread extends Thread { keyPayload.get(keyBytes); } - final T value = deserializer.deserialize(keyBytes, valueBytes, - currentPartition.getTopic(), currentPartition.getPartition(), offset); + final T value = deserializer.deserialize( + new ConsumerRecord<>( + currentPartition.getTopic(), + currentPartition.getPartition(), keyBytes, valueBytes, offset)); if (deserializer.isEndOfStream(value)) { // remove partition from subscribed partitions. diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index d55edbeb70e140e8e362995b7bf6ed75857e40a0..cddef88b904b626e0d52153fd811067244cc59b7 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -27,7 +27,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescript import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.util.NetUtils; @@ -112,7 +111,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override - public FlinkKafkaConsumerBase getConsumer(List topics, KeyedDeserializationSchema readSchema, Properties props) { + public FlinkKafkaConsumerBase getConsumer(List topics, KafkaDeserializationSchema readSchema, Properties props) { return new FlinkKafkaConsumer08<>(topics, readSchema, props); } diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java index 37435e854a6ddc39fa7646d3704c115e3ba9e35c..63e7bff973d9547399b32bb06e8f175557fbfcbb 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -30,8 +30,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper; import org.apache.flink.util.PropertiesUtil; import org.apache.flink.util.SerializedValue; @@ -110,7 +109,7 @@ public class FlinkKafkaConsumer09 extends FlinkKafkaConsumerBase { /** * Creates a new Kafka streaming source consumer for Kafka 0.9.x * - *

This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + *

This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value * pairs, offsets, and topic names from Kafka. * * @param topic @@ -120,7 +119,7 @@ public class FlinkKafkaConsumer09 extends FlinkKafkaConsumerBase { * @param props * The properties used to configure the Kafka consumer client, and the ZooKeeper client. */ - public FlinkKafkaConsumer09(String topic, KeyedDeserializationSchema deserializer, Properties props) { + public FlinkKafkaConsumer09(String topic, KafkaDeserializationSchema deserializer, Properties props) { this(Collections.singletonList(topic), deserializer, props); } @@ -137,7 +136,7 @@ public class FlinkKafkaConsumer09 extends FlinkKafkaConsumerBase { * The properties that are used to configure both the fetcher and the offset handler. */ public FlinkKafkaConsumer09(List topics, DeserializationSchema deserializer, Properties props) { - this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props); + this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props); } /** @@ -152,7 +151,7 @@ public class FlinkKafkaConsumer09 extends FlinkKafkaConsumerBase { * @param props * The properties that are used to configure both the fetcher and the offset handler. */ - public FlinkKafkaConsumer09(List topics, KeyedDeserializationSchema deserializer, Properties props) { + public FlinkKafkaConsumer09(List topics, KafkaDeserializationSchema deserializer, Properties props) { this(topics, null, deserializer, props); } @@ -173,7 +172,7 @@ public class FlinkKafkaConsumer09 extends FlinkKafkaConsumerBase { */ @PublicEvolving public FlinkKafkaConsumer09(Pattern subscriptionPattern, DeserializationSchema valueDeserializer, Properties props) { - this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props); + this(subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props); } /** @@ -184,7 +183,7 @@ public class FlinkKafkaConsumer09 extends FlinkKafkaConsumerBase { * {@link FlinkKafkaConsumer09#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics * with names matching the pattern will also be subscribed to as they are created on the fly. * - *

This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + *

This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value * pairs, offsets, and topic names from Kafka. * * @param subscriptionPattern @@ -195,14 +194,14 @@ public class FlinkKafkaConsumer09 extends FlinkKafkaConsumerBase { * The properties used to configure the Kafka consumer client, and the ZooKeeper client. */ @PublicEvolving - public FlinkKafkaConsumer09(Pattern subscriptionPattern, KeyedDeserializationSchema deserializer, Properties props) { + public FlinkKafkaConsumer09(Pattern subscriptionPattern, KafkaDeserializationSchema deserializer, Properties props) { this(null, subscriptionPattern, deserializer, props); } private FlinkKafkaConsumer09( List topics, Pattern subscriptionPattern, - KeyedDeserializationSchema deserializer, + KafkaDeserializationSchema deserializer, Properties props) { super( diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java index 852d56f86bec8600009f161bbed0f1a54f65d797..f727abd2fa83404f176b4811148d84ed7e3184da 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -23,12 +23,12 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.util.SerializedValue; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -60,7 +60,7 @@ public class Kafka09Fetcher extends AbstractFetcher { // ------------------------------------------------------------------------ /** The schema to convert between Kafka's byte messages, and Flink's objects. */ - private final KeyedDeserializationSchema deserializer; + private final KafkaDeserializationSchema deserializer; /** The handover of data and exceptions between the consumer thread and the task thread. */ private final Handover handover; @@ -82,7 +82,7 @@ public class Kafka09Fetcher extends AbstractFetcher { long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, - KeyedDeserializationSchema deserializer, + KafkaDeserializationSchema deserializer, Properties kafkaProperties, long pollTimeout, MetricGroup subtaskMetricGroup, @@ -139,9 +139,8 @@ public class Kafka09Fetcher extends AbstractFetcher { records.records(partition.getKafkaPartitionHandle()); for (ConsumerRecord record : partitionRecords) { - final T value = deserializer.deserialize( - record.key(), record.value(), - record.topic(), record.partition(), record.offset()); + + final T value = deserializer.deserialize(record); if (deserializer.isEndOfStream(value)) { // end of stream signaled diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index e778b738530ab091831164377546f03ff0a5d6fd..2ed600ba7e3f9c1ed645f7d83bbd46f95fe60c9d 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -23,7 +23,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.util.NetUtils; @@ -103,7 +102,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override - public FlinkKafkaConsumerBase getConsumer(List topics, KeyedDeserializationSchema readSchema, Properties props) { + public FlinkKafkaConsumerBase getConsumer(List topics, KafkaDeserializationSchema readSchema, Properties props) { return new FlinkKafkaConsumer09<>(topics, readSchema, props); } diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java index 27b67f10984138696264bbd1341be1a5d50e1eeb..cb866998d5afea9480256fec4aac637fdbc579ca 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java @@ -24,12 +24,12 @@ import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -115,7 +115,7 @@ public class Kafka09FetcherTest { SourceContext sourceContext = mock(SourceContext.class); Map partitionsWithInitialOffsets = Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); - KeyedDeserializationSchema schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + KafkaDeserializationSchema schema = new KafkaDeserializationSchemaWrapper<>(new SimpleStringSchema()); final Kafka09Fetcher fetcher = new Kafka09Fetcher<>( sourceContext, @@ -251,7 +251,7 @@ public class Kafka09FetcherTest { SourceContext sourceContext = mock(SourceContext.class); Map partitionsWithInitialOffsets = Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); - KeyedDeserializationSchema schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + KafkaDeserializationSchema schema = new KafkaDeserializationSchemaWrapper<>(new SimpleStringSchema()); final Kafka09Fetcher fetcher = new Kafka09Fetcher<>( sourceContext, @@ -366,7 +366,7 @@ public class Kafka09FetcherTest { BlockingSourceContext sourceContext = new BlockingSourceContext<>(); Map partitionsWithInitialOffsets = Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET); - KeyedDeserializationSchema schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + KafkaDeserializationSchema schema = new KafkaDeserializationSchemaWrapper<>(new SimpleStringSchema()); final Kafka09Fetcher fetcher = new Kafka09Fetcher<>( sourceContext, diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java index bf8f29e7e279168122f4b19bd3920dd59a426d23..ad9fc4fa21101a9b87e21f849a3eecb1304d1fb9 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java @@ -49,7 +49,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.SerializedValue; @@ -119,7 +118,7 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti private final KafkaTopicsDescriptor topicsDescriptor; /** The schema to convert between Kafka's byte messages, and Flink's objects. */ - protected final KeyedDeserializationSchema deserializer; + protected final KafkaDeserializationSchema deserializer; /** The set of topic partitions that the source will read, with their initial offsets to start reading from. */ private Map subscribedPartitionsToStartOffsets; @@ -236,7 +235,7 @@ public abstract class FlinkKafkaConsumerBase extends RichParallelSourceFuncti public FlinkKafkaConsumerBase( List topics, Pattern topicPattern, - KeyedDeserializationSchema deserializer, + KafkaDeserializationSchema deserializer, long discoveryIntervalMillis, boolean useMetrics) { this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern); diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java new file mode 100644 index 0000000000000000000000000000000000000000..8449f1c2e4862c8a9255b65ea3da42a1448fc3f5 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.io.Serializable; + +/** + * The deserialization schema describes how to turn the Kafka ConsumerRecords + * into data types (Java/Scala objects) that are processed by Flink. + * + * @param The type created by the keyed deserialization schema. + */ +@PublicEvolving +public interface KafkaDeserializationSchema extends Serializable, ResultTypeQueryable { + + /** + * Method to decide whether the element signals the end of the stream. If + * true is returned the element won't be emitted. + * + * @param nextElement The element to test for the end-of-stream signal. + * + * @return True, if the element signals end of stream, false otherwise. + */ + boolean isEndOfStream(T nextElement); + + /** + * Deserializes the Kafka record. + * + * @param record Kafka record to be deserialized. + * + * @return The deserialized message as an object (null if the message cannot be deserialized). + */ + T deserialize(ConsumerRecord record) throws Exception; +} diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java similarity index 78% rename from flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java rename to flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java index 06289e5069a7316028e0b0f610ea4cfef69ccc13..a00f3d9ac5a4f4ba15069696f3f6908fc7876cce 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaDeserializationSchemaWrapper.java @@ -20,28 +20,29 @@ package org.apache.flink.streaming.util.serialization; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; -import java.io.IOException; +import org.apache.kafka.clients.consumer.ConsumerRecord; /** - * A simple wrapper for using the DeserializationSchema with the KeyedDeserializationSchema + * A simple wrapper for using the DeserializationSchema with the KafkaDeserializationSchema * interface. * @param The type created by the deserialization schema. */ @Internal -public class KeyedDeserializationSchemaWrapper implements KeyedDeserializationSchema { +public class KafkaDeserializationSchemaWrapper implements KafkaDeserializationSchema { private static final long serialVersionUID = 2651665280744549932L; private final DeserializationSchema deserializationSchema; - public KeyedDeserializationSchemaWrapper(DeserializationSchema deserializationSchema) { + public KafkaDeserializationSchemaWrapper(DeserializationSchema deserializationSchema) { this.deserializationSchema = deserializationSchema; } @Override - public T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { - return deserializationSchema.deserialize(message); + public T deserialize(ConsumerRecord record) throws Exception { + return deserializationSchema.deserialize(record.value()); } @Override diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java index 95b73e1ea49efcfe76768147f9ee72e4ecc7732f..6b2b63efa49bf667cbc1d3572e4d4f5658c826b0 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONKeyValueDeserializationSchema.java @@ -19,12 +19,13 @@ package org.apache.flink.streaming.util.serialization; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; -import java.io.IOException; +import org.apache.kafka.clients.consumer.ConsumerRecord; import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass; @@ -39,7 +40,7 @@ import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass; * the "offset" (long), "topic" (String) and "partition" (int). */ @PublicEvolving -public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema { +public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSchema { private static final long serialVersionUID = 1509391548173891955L; @@ -51,22 +52,22 @@ public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSc } @Override - public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + public ObjectNode deserialize(ConsumerRecord record) throws Exception { if (mapper == null) { mapper = new ObjectMapper(); } ObjectNode node = mapper.createObjectNode(); - if (messageKey != null) { - node.set("key", mapper.readValue(messageKey, JsonNode.class)); + if (record.key() != null) { + node.set("key", mapper.readValue(record.key(), JsonNode.class)); } - if (message != null) { - node.set("value", mapper.readValue(message, JsonNode.class)); + if (record.value() != null) { + node.set("value", mapper.readValue(record.value(), JsonNode.class)); } if (includeMetadata) { node.putObject("metadata") - .put("offset", offset) - .put("topic", topic) - .put("partition", partition); + .put("offset", record.offset()) + .put("topic", record.topic()) + .put("partition", record.partition()); } return node; } diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java index 0ef6fd577db4a7989f8f84f25288ffdf92e7e8dc..a18bd909cc27ed28a2fe224b94d369d07964e769 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java @@ -18,10 +18,11 @@ package org.apache.flink.streaming.util.serialization; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecord; import java.io.IOException; -import java.io.Serializable; /** * The deserialization schema describes how to turn the byte key / value messages delivered by certain @@ -29,10 +30,12 @@ import java.io.Serializable; * processed by Flink. * * @param The type created by the keyed deserialization schema. + * + * @deprecated Use {@link KafkaDeserializationSchema}. */ +@Deprecated @PublicEvolving -public interface KeyedDeserializationSchema extends Serializable, ResultTypeQueryable { - +public interface KeyedDeserializationSchema extends KafkaDeserializationSchema { /** * Deserializes the byte message. * @@ -45,13 +48,8 @@ public interface KeyedDeserializationSchema extends Serializable, ResultTypeQ */ T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException; - /** - * Method to decide whether the element signals the end of the stream. If - * true is returned the element won't be emitted. - * - * @param nextElement The element to test for the end-of-stream signal. - * - * @return True, if the element signals end of stream, false otherwise. - */ - boolean isEndOfStream(T nextElement); + @Override + default T deserialize(ConsumerRecord record) throws IOException { + return deserialize(record.key(), record.value(), record.topic(), record.partition(), record.offset()); + } } diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java index cc4a54bfeef98305de0ab75d01163fe4211ceaf6..3c2d0db3063d2c409db1a7bf46e12a5661e2534b 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java @@ -27,6 +27,9 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecord; import java.io.IOException; @@ -38,7 +41,7 @@ import java.io.IOException; * @param The value type to be serialized. */ @Internal -public class TypeInformationKeyValueSerializationSchema implements KeyedDeserializationSchema>, KeyedSerializationSchema> { +public class TypeInformationKeyValueSerializationSchema implements KafkaDeserializationSchema>, KeyedSerializationSchema> { private static final long serialVersionUID = -5359448468131559102L; @@ -96,16 +99,16 @@ public class TypeInformationKeyValueSerializationSchema implements KeyedDe // ------------------------------------------------------------------------ @Override - public Tuple2 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + public Tuple2 deserialize(ConsumerRecord record) throws Exception { K key = null; V value = null; - if (messageKey != null) { - inputDeserializer.setBuffer(messageKey); + if (record.key() != null) { + inputDeserializer.setBuffer(record.key()); key = keySerializer.deserialize(inputDeserializer); } - if (message != null) { - inputDeserializer.setBuffer(message); + if (record.value() != null) { + inputDeserializer.setBuffer(record.value()); value = valueSerializer.deserialize(inputDeserializer); } return new Tuple2<>(key, value); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java index 27c10a9ccebb4ce1742068b1373815da0f9db37e..6d8f1fa111eea00ac5128177df89c88abf552306 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseMigrationTest.java @@ -36,7 +36,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.OperatorSnapshotUtil; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.testutils.migration.MigrationVersion; import org.apache.flink.util.SerializedValue; @@ -370,7 +369,7 @@ public class FlinkKafkaConsumerBaseMigrationTest { super( Arrays.asList("dummy-topic"), null, - (KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class), + (KafkaDeserializationSchema< T >) mock(KafkaDeserializationSchema.class), discoveryInterval, false); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java index e59e2a6dd3384bc18d874f487cb98a442985eccf..e464a1942ddbeddd3122fa222c166e2d8751f775 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java @@ -51,7 +51,6 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.MockStreamingRuntimeContext; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; @@ -904,7 +903,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger { super( Collections.singletonList("dummy-topic"), null, - (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class), + (KafkaDeserializationSchema< T >) mock(KafkaDeserializationSchema.class), discoveryIntervalMillis, false); @@ -957,7 +956,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger { TestingFlinkKafkaConsumer(final AbstractPartitionDiscoverer partitionDiscoverer, long discoveryIntervalMillis) { super(Collections.singletonList("dummy-topic"), null, - (KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class), + (KafkaDeserializationSchema < T >) mock(KafkaDeserializationSchema.class), discoveryIntervalMillis, false); this.partitionDiscoverer = partitionDiscoverer; diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java index 423f637c1f942e79cc74d90dbbbc09de0e51b636..9bb0276f42fe180cae403a6faa56b4546d722590 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JSONKeyValueDeserializationSchemaTest.java @@ -22,17 +22,17 @@ import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserialization import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.Assert; import org.junit.Test; -import java.io.IOException; - /** * Tests for the{@link JSONKeyValueDeserializationSchema}. */ public class JSONKeyValueDeserializationSchemaTest { + @Test - public void testDeserializeWithoutMetadata() throws IOException { + public void testDeserializeWithoutMetadata() throws Exception { ObjectMapper mapper = new ObjectMapper(); ObjectNode initialKey = mapper.createObjectNode(); initialKey.put("index", 4); @@ -43,7 +43,7 @@ public class JSONKeyValueDeserializationSchemaTest { byte[] serializedValue = mapper.writeValueAsBytes(initialValue); JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false); - ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "", 0, 0); + ObjectNode deserializedValue = schema.deserialize(newConsumerRecord(serializedKey, serializedValue)); Assert.assertTrue(deserializedValue.get("metadata") == null); Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt()); @@ -51,7 +51,7 @@ public class JSONKeyValueDeserializationSchemaTest { } @Test - public void testDeserializeWithoutKey() throws IOException { + public void testDeserializeWithoutKey() throws Exception { ObjectMapper mapper = new ObjectMapper(); byte[] serializedKey = null; @@ -60,15 +60,26 @@ public class JSONKeyValueDeserializationSchemaTest { byte[] serializedValue = mapper.writeValueAsBytes(initialValue); JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false); - ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "", 0, 0); + ObjectNode deserializedValue = schema.deserialize(newConsumerRecord(serializedKey, serializedValue)); Assert.assertTrue(deserializedValue.get("metadata") == null); Assert.assertTrue(deserializedValue.get("key") == null); Assert.assertEquals("world", deserializedValue.get("value").get("word").asText()); } + private static ConsumerRecord newConsumerRecord( + byte[] serializedKey, byte[] serializedValue) { + return newConsumerRecord("", 0, 0L, serializedKey, serializedValue); + } + + private static ConsumerRecord newConsumerRecord( + String topic, int partition, long offset, byte[] serializedKey, byte[] serializedValue) { + + return new ConsumerRecord<>(topic, partition, offset, serializedKey, serializedValue); + } + @Test - public void testDeserializeWithoutValue() throws IOException { + public void testDeserializeWithoutValue() throws Exception { ObjectMapper mapper = new ObjectMapper(); ObjectNode initialKey = mapper.createObjectNode(); initialKey.put("index", 4); @@ -77,7 +88,7 @@ public class JSONKeyValueDeserializationSchemaTest { byte[] serializedValue = null; JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false); - ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "", 0, 0); + ObjectNode deserializedValue = schema.deserialize(newConsumerRecord(serializedKey, serializedValue)); Assert.assertTrue(deserializedValue.get("metadata") == null); Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt()); @@ -85,7 +96,7 @@ public class JSONKeyValueDeserializationSchemaTest { } @Test - public void testDeserializeWithMetadata() throws IOException { + public void testDeserializeWithMetadata() throws Exception { ObjectMapper mapper = new ObjectMapper(); ObjectNode initialKey = mapper.createObjectNode(); initialKey.put("index", 4); @@ -96,7 +107,9 @@ public class JSONKeyValueDeserializationSchemaTest { byte[] serializedValue = mapper.writeValueAsBytes(initialValue); JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(true); - ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "topic#1", 3, 4); + final ConsumerRecord consumerRecord = + newConsumerRecord("topic#1", 3, 4L, serializedKey, serializedValue); + ObjectNode deserializedValue = schema.deserialize(consumerRecord); Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt()); Assert.assertEquals("world", deserializedValue.get("value").get("word").asText()); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index 6e2464b4c48d9c153ddcc1fd4ad81c9451d534b5..a8924e73f02511bbf3ae92dbc5c033cbf26ced72 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -65,8 +65,7 @@ import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidating import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper; import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner; import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema; @@ -80,6 +79,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; import kafka.server.KafkaServer; import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.errors.TimeoutException; import org.junit.Assert; @@ -421,8 +421,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { new KeyedSerializationSchemaWrapper<>( new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); - final KeyedDeserializationSchema> deserSchema = - new KeyedDeserializationSchemaWrapper<>( + final KafkaDeserializationSchema> deserSchema = + new KafkaDeserializationSchemaWrapper<>( new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); // setup and run the latest-consuming job @@ -1417,7 +1417,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { env.setRestartStrategy(RestartStrategies.noRestart()); env.getConfig().disableSysoutLogging(); - KeyedDeserializationSchema> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig()); + KafkaDeserializationSchema> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig()); Properties props = new Properties(); props.putAll(standardProps); @@ -1865,8 +1865,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { new KeyedSerializationSchemaWrapper<>( new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); - final KeyedDeserializationSchema> deserSchema = - new KeyedDeserializationSchemaWrapper<>( + final KafkaDeserializationSchema> deserSchema = + new KafkaDeserializationSchemaWrapper<>( new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); final int maxNumAttempts = 10; @@ -1963,8 +1963,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { new KeyedSerializationSchemaWrapper<>( new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); - final KeyedDeserializationSchema> deserSchema = - new KeyedDeserializationSchemaWrapper<>( + final KafkaDeserializationSchema> deserSchema = + new KafkaDeserializationSchemaWrapper<>( new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig())); // -------- Write the append sequence -------- @@ -2025,7 +2025,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { private boolean validateSequence( final String topic, final int parallelism, - KeyedDeserializationSchema> deserSchema, + KafkaDeserializationSchema> deserSchema, final int totalNumElements) throws Exception { final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -2194,7 +2194,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { } } - private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema>, + private static class Tuple2WithTopicSchema implements KafkaDeserializationSchema>, KeyedSerializationSchema> { private final TypeSerializer> ts; @@ -2204,10 +2204,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink { } @Override - public Tuple3 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { - DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); + public Tuple3 deserialize(ConsumerRecord record) throws Exception { + DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(record.value())); Tuple2 t2 = ts.deserialize(in); - return new Tuple3<>(t2.f0, t2.f1, topic); + return new Tuple3<>(t2.f0, t2.f1, record.topic()); } @Override diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java index 9a11e7f483418441ca522ce7f8155e5ed3e30e93..6c5ceaa65e6cf37c20a0376e34f881850164dd0a 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java @@ -30,11 +30,11 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.util.InstantiationUtil; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -42,7 +42,6 @@ import org.junit.rules.TemporaryFolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.io.Serializable; import java.util.Properties; @@ -188,12 +187,13 @@ public class KafkaShortRetentionTestBase implements Serializable { kafkaServer.deleteTestTopic(topic); } - private class NonContinousOffsetsDeserializationSchema implements KeyedDeserializationSchema { + private class NonContinousOffsetsDeserializationSchema implements KafkaDeserializationSchema { private int numJumps; long nextExpected = 0; @Override - public String deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + public String deserialize(ConsumerRecord record) { + final long offset = record.offset(); if (offset != nextExpected) { numJumps++; nextExpected = offset; diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index 8a287d3766fa21f208b25a50f19ebfa392b3933c..9dd23eff37821acd6b112898357e14ac0303cf2c 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -23,8 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import kafka.server.KafkaServer; @@ -128,10 +127,10 @@ public abstract class KafkaTestEnvironment { // -- consumer / producer instances: public FlinkKafkaConsumerBase getConsumer(List topics, DeserializationSchema deserializationSchema, Properties props) { - return getConsumer(topics, new KeyedDeserializationSchemaWrapper(deserializationSchema), props); + return getConsumer(topics, new KafkaDeserializationSchemaWrapper(deserializationSchema), props); } - public FlinkKafkaConsumerBase getConsumer(String topic, KeyedDeserializationSchema readSchema, Properties props) { + public FlinkKafkaConsumerBase getConsumer(String topic, KafkaDeserializationSchema readSchema, Properties props) { return getConsumer(Collections.singletonList(topic), readSchema, props); } @@ -139,7 +138,7 @@ public abstract class KafkaTestEnvironment { return getConsumer(Collections.singletonList(topic), deserializationSchema, props); } - public abstract FlinkKafkaConsumerBase getConsumer(List topics, KeyedDeserializationSchema readSchema, Properties props); + public abstract FlinkKafkaConsumerBase getConsumer(List topics, KafkaDeserializationSchema readSchema, Properties props); public abstract Collection> getAllRecordsFromTopic( Properties properties, diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java index 6b38e2ea2d100aeff14c39a995a8eeb3c3e6f66b..5a0828d9239e0acc3969a7b0f4e440c8581b7c90 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java @@ -30,8 +30,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper; import org.apache.flink.util.PropertiesUtil; import org.apache.flink.util.SerializedValue; @@ -105,14 +104,14 @@ public class FlinkKafkaConsumer extends FlinkKafkaConsumerBase { /** * Creates a new Kafka streaming source consumer. * - *

This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + *

This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value * pairs, offsets, and topic names from Kafka. * * @param topic The name of the topic that should be consumed. * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. * @param props */ - public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema deserializer, Properties props) { + public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema deserializer, Properties props) { this(Collections.singletonList(topic), deserializer, props); } @@ -126,7 +125,7 @@ public class FlinkKafkaConsumer extends FlinkKafkaConsumerBase { * @param props */ public FlinkKafkaConsumer(List topics, DeserializationSchema deserializer, Properties props) { - this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props); + this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props); } /** @@ -138,7 +137,7 @@ public class FlinkKafkaConsumer extends FlinkKafkaConsumerBase { * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. * @param props */ - public FlinkKafkaConsumer(List topics, KeyedDeserializationSchema deserializer, Properties props) { + public FlinkKafkaConsumer(List topics, KafkaDeserializationSchema deserializer, Properties props) { this(topics, null, deserializer, props); } @@ -155,7 +154,7 @@ public class FlinkKafkaConsumer extends FlinkKafkaConsumerBase { * @param props */ public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema valueDeserializer, Properties props) { - this(null, subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props); + this(null, subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props); } /** @@ -166,21 +165,21 @@ public class FlinkKafkaConsumer extends FlinkKafkaConsumerBase { * {@link FlinkKafkaConsumer#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics * with names matching the pattern will also be subscribed to as they are created on the fly. * - *

This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value + *

This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value * pairs, offsets, and topic names from Kafka. * * @param subscriptionPattern The regular expression for a pattern of topic names to subscribe to. * @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects. * @param props */ - public FlinkKafkaConsumer(Pattern subscriptionPattern, KeyedDeserializationSchema deserializer, Properties props) { + public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema deserializer, Properties props) { this(null, subscriptionPattern, deserializer, props); } private FlinkKafkaConsumer( List topics, Pattern subscriptionPattern, - KeyedDeserializationSchema deserializer, + KafkaDeserializationSchema deserializer, Properties props) { super( diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java index 59e63dd44f729bc62887d36af2879df0db815d84..9ec9921353c15f6cf52a666b97441767fe659228 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaFetcher.java @@ -22,12 +22,12 @@ import org.apache.flink.metrics.MetricGroup; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher; import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.util.SerializedValue; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -59,7 +59,7 @@ public class KafkaFetcher extends AbstractFetcher { // ------------------------------------------------------------------------ /** The schema to convert between Kafka's byte messages, and Flink's objects. */ - private final KeyedDeserializationSchema deserializer; + private final KafkaDeserializationSchema deserializer; /** The handover of data and exceptions between the consumer thread and the task thread. */ private final Handover handover; @@ -81,7 +81,7 @@ public class KafkaFetcher extends AbstractFetcher { long autoWatermarkInterval, ClassLoader userCodeClassLoader, String taskNameWithSubtasks, - KeyedDeserializationSchema deserializer, + KafkaDeserializationSchema deserializer, Properties kafkaProperties, long pollTimeout, MetricGroup subtaskMetricGroup, @@ -137,9 +137,7 @@ public class KafkaFetcher extends AbstractFetcher { records.records(partition.getKafkaPartitionHandle()); for (ConsumerRecord record : partitionRecords) { - final T value = deserializer.deserialize( - record.key(), record.value(), - record.topic(), record.partition(), record.offset()); + final T value = deserializer.deserialize(record); if (deserializer.isEndOfStream(value)) { // end of stream signaled diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java index 11aee4a65fd928ce7d33fe0ec49c7011389bfe7f..57f8524a743b11aeb85ea6a8d0eda6c5b97617e8 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java @@ -36,9 +36,9 @@ import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.BeforeClass; import org.junit.Test; @@ -319,7 +319,7 @@ public class KafkaITCase extends KafkaConsumerTestBase { } } - private static class LimitedLongDeserializer implements KeyedDeserializationSchema { + private static class LimitedLongDeserializer implements KafkaDeserializationSchema { private static final long serialVersionUID = 6966177118923713521L; private final TypeInformation ti; @@ -337,9 +337,9 @@ public class KafkaITCase extends KafkaConsumerTestBase { } @Override - public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException { + public Long deserialize(ConsumerRecord record) throws IOException { cnt++; - DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message)); + DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(record.value())); Long e = ser.deserialize(in); return e; } @@ -349,5 +349,4 @@ public class KafkaITCase extends KafkaConsumerTestBase { return cnt > 1110L; } } - } diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java index 2f16904f633526d9e22a2055c76d6bbecd0c18bc..90e9e5de6d3c0ec9acfadf8f3a891c15b192637e 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java @@ -23,7 +23,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.util.NetUtils; @@ -227,7 +226,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment { } @Override - public FlinkKafkaConsumerBase getConsumer(List topics, KeyedDeserializationSchema readSchema, Properties props) { + public FlinkKafkaConsumerBase getConsumer(List topics, KafkaDeserializationSchema readSchema, Properties props) { return new FlinkKafkaConsumer(topics, readSchema, props); }