未验证 提交 9e649517 编写于 作者: A Alexey Trenikhin 提交者: Aljoscha Krettek

[FLINK-8354] Add KafkaDeserializationSchema that uses ConsumerRecord

We now directly use the ConsumerRecord from the Kafka API instead of
trying to forward what we need to the deserialization schema ourselves.

This makes it more future-proof, if Kafka adds new fields to the
ConsumerRecord.

The previously used KeyedDeserializationSchema now extends
KafkaDeserializationSchema and has a default method to bridge the
interface. This way existing uses of KeyedDeserializationSchema still
work.
上级 fa403a99
......@@ -30,8 +30,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.KafkaConsumer;
......@@ -87,7 +86,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
/**
* Creates a new Kafka streaming source consumer for Kafka 0.10.x
*
* <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
* <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value
* pairs, offsets, and topic names from Kafka.
*
* @param topic
......@@ -97,7 +96,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
* @param props
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
public FlinkKafkaConsumer010(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
public FlinkKafkaConsumer010(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(Collections.singletonList(topic), deserializer, props);
}
......@@ -114,7 +113,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
* The properties that are used to configure both the fetcher and the offset handler.
*/
public FlinkKafkaConsumer010(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props);
this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
}
/**
......@@ -129,7 +128,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
* @param props
* The properties that are used to configure both the fetcher and the offset handler.
*/
public FlinkKafkaConsumer010(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
public FlinkKafkaConsumer010(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
super(topics, deserializer, props);
}
......@@ -150,7 +149,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
*/
@PublicEvolving
public FlinkKafkaConsumer010(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props);
this(subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
}
/**
......@@ -161,7 +160,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
* {@link FlinkKafkaConsumer010#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
* with names matching the pattern will also be subscribed to as they are created on the fly.
*
* <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
* <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value
* pairs, offsets, and topic names from Kafka.
*
* @param subscriptionPattern
......@@ -172,7 +171,7 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
@PublicEvolving
public FlinkKafkaConsumer010(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
public FlinkKafkaConsumer010(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
super(subscriptionPattern, deserializer, props);
}
......
......@@ -23,10 +23,10 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerRecord;
......@@ -55,7 +55,7 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
String taskNameWithSubtasks,
KeyedDeserializationSchema<T> deserializer,
KafkaDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long pollTimeout,
MetricGroup subtaskMetricGroup,
......
......@@ -36,9 +36,9 @@ import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Ignore;
import org.junit.Test;
......@@ -310,7 +310,7 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
}
}
private static class LimitedLongDeserializer implements KeyedDeserializationSchema<Long> {
private static class LimitedLongDeserializer implements KafkaDeserializationSchema<Long> {
private static final long serialVersionUID = 6966177118923713521L;
private final TypeInformation<Long> ti;
......@@ -328,9 +328,9 @@ public class Kafka010ITCase extends KafkaConsumerTestBase {
}
@Override
public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
public Long deserialize(ConsumerRecord<byte[], byte[]> record) throws IOException {
cnt++;
DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(record.value()));
Long e = ser.deserialize(in);
return e;
}
......
......@@ -23,7 +23,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
......@@ -120,7 +119,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KafkaDeserializationSchema<T> readSchema, Properties props) {
return new FlinkKafkaConsumer010<>(topics, readSchema, props);
}
......
......@@ -24,12 +24,12 @@ import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
......@@ -115,7 +115,7 @@ public class Kafka010FetcherTest {
SourceContext<String> sourceContext = mock(SourceContext.class);
Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
KafkaDeserializationSchema<String> schema = new KafkaDeserializationSchemaWrapper<>(new SimpleStringSchema());
final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
sourceContext,
......@@ -252,7 +252,7 @@ public class Kafka010FetcherTest {
SourceContext<String> sourceContext = mock(SourceContext.class);
Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
KafkaDeserializationSchema<String> schema = new KafkaDeserializationSchemaWrapper<>(new SimpleStringSchema());
final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
sourceContext,
......@@ -367,7 +367,7 @@ public class Kafka010FetcherTest {
BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
KafkaDeserializationSchema<String> schema = new KafkaDeserializationSchemaWrapper<>(new SimpleStringSchema());
final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
sourceContext,
......
......@@ -19,8 +19,7 @@ package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper;
import java.util.Collections;
import java.util.List;
......@@ -68,7 +67,7 @@ public class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {
/**
* Creates a new Kafka streaming source consumer for Kafka 0.11.x
*
* <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
* <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value
* pairs, offsets, and topic names from Kafka.
*
* @param topic
......@@ -78,7 +77,7 @@ public class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {
* @param props
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
public FlinkKafkaConsumer011(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
public FlinkKafkaConsumer011(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(Collections.singletonList(topic), deserializer, props);
}
......@@ -95,7 +94,7 @@ public class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {
* The properties that are used to configure both the fetcher and the offset handler.
*/
public FlinkKafkaConsumer011(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props);
this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
}
/**
......@@ -110,7 +109,7 @@ public class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {
* @param props
* The properties that are used to configure both the fetcher and the offset handler.
*/
public FlinkKafkaConsumer011(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
public FlinkKafkaConsumer011(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
super(topics, deserializer, props);
}
......@@ -131,7 +130,7 @@ public class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {
*/
@PublicEvolving
public FlinkKafkaConsumer011(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props);
this(subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
}
/**
......@@ -142,7 +141,7 @@ public class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {
* {@link FlinkKafkaConsumer011#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
* with names matching the pattern will also be subscribed to as they are created on the fly.
*
* <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
* <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value
* pairs, offsets, and topic names from Kafka.
*
* @param subscriptionPattern
......@@ -153,7 +152,7 @@ public class FlinkKafkaConsumer011<T> extends FlinkKafkaConsumer010<T> {
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
@PublicEvolving
public FlinkKafkaConsumer011(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
public FlinkKafkaConsumer011(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
super(subscriptionPattern, deserializer, props);
}
}
......@@ -36,9 +36,9 @@ import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.BeforeClass;
import org.junit.Test;
......@@ -319,7 +319,7 @@ public class Kafka011ITCase extends KafkaConsumerTestBase {
}
}
private static class LimitedLongDeserializer implements KeyedDeserializationSchema<Long> {
private static class LimitedLongDeserializer implements KafkaDeserializationSchema<Long> {
private static final long serialVersionUID = 6966177118923713521L;
private final TypeInformation<Long> ti;
......@@ -337,9 +337,9 @@ public class Kafka011ITCase extends KafkaConsumerTestBase {
}
@Override
public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
public Long deserialize(ConsumerRecord<byte[], byte[]> record) throws IOException {
cnt++;
DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(record.value()));
Long e = ser.deserialize(in);
return e;
}
......@@ -349,5 +349,4 @@ public class Kafka011ITCase extends KafkaConsumerTestBase {
return cnt > 1110L;
}
}
}
......@@ -24,7 +24,6 @@ import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
......@@ -127,7 +126,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KafkaDeserializationSchema<T> readSchema, Properties props) {
return new FlinkKafkaConsumer011<>(topics, readSchema, props);
}
......
......@@ -30,8 +30,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher;
import org.apache.flink.streaming.connectors.kafka.internals.Kafka08PartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper;
import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue;
......@@ -119,7 +118,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
/**
* Creates a new Kafka streaming source consumer for Kafka 0.8.x
*
* <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
* <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value
* pairs, offsets, and topic names from Kafka.
*
* @param topic
......@@ -129,7 +128,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
* @param props
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
public FlinkKafkaConsumer08(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(Collections.singletonList(topic), deserializer, props);
}
......@@ -146,7 +145,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
* The properties that are used to configure both the fetcher and the offset handler.
*/
public FlinkKafkaConsumer08(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props);
this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
}
/**
......@@ -161,7 +160,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
* @param props
* The properties that are used to configure both the fetcher and the offset handler.
*/
public FlinkKafkaConsumer08(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
public FlinkKafkaConsumer08(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(topics, null, deserializer, props);
}
......@@ -182,7 +181,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
*/
@PublicEvolving
public FlinkKafkaConsumer08(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props);
this(subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
}
/**
......@@ -193,7 +192,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
* {@link FlinkKafkaConsumer08#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
* with names matching the pattern will also be subscribed to as they are created on the fly.
*
* <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
* <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value
* pairs, offsets, and topic names from Kafka.
*
* @param subscriptionPattern
......@@ -204,14 +203,14 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
@PublicEvolving
public FlinkKafkaConsumer08(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
public FlinkKafkaConsumer08(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(null, subscriptionPattern, deserializer, props);
}
private FlinkKafkaConsumer08(
List<String> topics,
Pattern subscriptionPattern,
KeyedDeserializationSchema<T> deserializer,
KafkaDeserializationSchema<T> deserializer,
Properties props) {
super(
......
......@@ -25,8 +25,8 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.SerializedValue;
......@@ -68,7 +68,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
// ------------------------------------------------------------------------
/** The schema to convert between Kafka's byte messages, and Flink's objects. */
private final KeyedDeserializationSchema<T> deserializer;
private final KafkaDeserializationSchema<T> deserializer;
/** The properties that configure the Kafka connection. */
private final Properties kafkaConfig;
......@@ -94,7 +94,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
KeyedDeserializationSchema<T> deserializer,
KafkaDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long autoCommitInterval,
MetricGroup consumerMetricGroup,
......@@ -387,7 +387,7 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
ExceptionProxy errorHandler) throws IOException, ClassNotFoundException {
// each thread needs its own copy of the deserializer, because the deserializer is
// not necessarily thread safe
final KeyedDeserializationSchema<T> clonedDeserializer =
final KafkaDeserializationSchema<T> clonedDeserializer =
InstantiationUtil.clone(deserializer, runtimeContext.getUserCodeClassLoader());
// seed thread with list of fetch partitions (otherwise it would shut down immediately again
......
......@@ -19,7 +19,7 @@
package org.apache.flink.streaming.connectors.kafka.internals;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.util.ExceptionUtils;
import kafka.api.FetchRequestBuilder;
......@@ -32,6 +32,7 @@ import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -68,7 +69,7 @@ class SimpleConsumerThread<T> extends Thread {
private final Kafka08Fetcher<T> owner;
private final KeyedDeserializationSchema<T> deserializer;
private final KafkaDeserializationSchema<T> deserializer;
private final List<KafkaTopicPartitionState<TopicAndPartition>> partitions;
......@@ -104,7 +105,7 @@ class SimpleConsumerThread<T> extends Thread {
Node broker,
List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions,
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions,
KeyedDeserializationSchema<T> deserializer,
KafkaDeserializationSchema<T> deserializer,
long invalidOffsetBehavior) {
this.owner = owner;
this.errorHandler = errorHandler;
......@@ -370,8 +371,10 @@ class SimpleConsumerThread<T> extends Thread {
keyPayload.get(keyBytes);
}
final T value = deserializer.deserialize(keyBytes, valueBytes,
currentPartition.getTopic(), currentPartition.getPartition(), offset);
final T value = deserializer.deserialize(
new ConsumerRecord<>(
currentPartition.getTopic(),
currentPartition.getPartition(), keyBytes, valueBytes, offset));
if (deserializer.isEndOfStream(value)) {
// remove partition from subscribed partitions.
......
......@@ -27,7 +27,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescript
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
......@@ -112,7 +111,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KafkaDeserializationSchema<T> readSchema, Properties props) {
return new FlinkKafkaConsumer08<>(topics, readSchema, props);
}
......
......@@ -30,8 +30,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper;
import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue;
......@@ -110,7 +109,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
/**
* Creates a new Kafka streaming source consumer for Kafka 0.9.x
*
* <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
* <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value
* pairs, offsets, and topic names from Kafka.
*
* @param topic
......@@ -120,7 +119,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
* @param props
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
public FlinkKafkaConsumer09(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
public FlinkKafkaConsumer09(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(Collections.singletonList(topic), deserializer, props);
}
......@@ -137,7 +136,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
* The properties that are used to configure both the fetcher and the offset handler.
*/
public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props);
this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
}
/**
......@@ -152,7 +151,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
* @param props
* The properties that are used to configure both the fetcher and the offset handler.
*/
public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
public FlinkKafkaConsumer09(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(topics, null, deserializer, props);
}
......@@ -173,7 +172,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
*/
@PublicEvolving
public FlinkKafkaConsumer09(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
this(subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props);
this(subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
}
/**
......@@ -184,7 +183,7 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
* {@link FlinkKafkaConsumer09#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
* with names matching the pattern will also be subscribed to as they are created on the fly.
*
* <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
* <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value
* pairs, offsets, and topic names from Kafka.
*
* @param subscriptionPattern
......@@ -195,14 +194,14 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
* The properties used to configure the Kafka consumer client, and the ZooKeeper client.
*/
@PublicEvolving
public FlinkKafkaConsumer09(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
public FlinkKafkaConsumer09(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(null, subscriptionPattern, deserializer, props);
}
private FlinkKafkaConsumer09(
List<String> topics,
Pattern subscriptionPattern,
KeyedDeserializationSchema<T> deserializer,
KafkaDeserializationSchema<T> deserializer,
Properties props) {
super(
......
......@@ -23,12 +23,12 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerRecord;
......@@ -60,7 +60,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
// ------------------------------------------------------------------------
/** The schema to convert between Kafka's byte messages, and Flink's objects. */
private final KeyedDeserializationSchema<T> deserializer;
private final KafkaDeserializationSchema<T> deserializer;
/** The handover of data and exceptions between the consumer thread and the task thread. */
private final Handover handover;
......@@ -82,7 +82,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
String taskNameWithSubtasks,
KeyedDeserializationSchema<T> deserializer,
KafkaDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long pollTimeout,
MetricGroup subtaskMetricGroup,
......@@ -139,9 +139,8 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> {
records.records(partition.getKafkaPartitionHandle());
for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
final T value = deserializer.deserialize(
record.key(), record.value(),
record.topic(), record.partition(), record.offset());
final T value = deserializer.deserialize(record);
if (deserializer.isEndOfStream(value)) {
// end of stream signaled
......
......@@ -23,7 +23,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
......@@ -103,7 +102,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KafkaDeserializationSchema<T> readSchema, Properties props) {
return new FlinkKafkaConsumer09<>(topics, readSchema, props);
}
......
......@@ -24,12 +24,12 @@ import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
......@@ -115,7 +115,7 @@ public class Kafka09FetcherTest {
SourceContext<String> sourceContext = mock(SourceContext.class);
Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
KafkaDeserializationSchema<String> schema = new KafkaDeserializationSchemaWrapper<>(new SimpleStringSchema());
final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
sourceContext,
......@@ -251,7 +251,7 @@ public class Kafka09FetcherTest {
SourceContext<String> sourceContext = mock(SourceContext.class);
Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
Collections.singletonMap(new KafkaTopicPartition("test", 42), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
KafkaDeserializationSchema<String> schema = new KafkaDeserializationSchemaWrapper<>(new SimpleStringSchema());
final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
sourceContext,
......@@ -366,7 +366,7 @@ public class Kafka09FetcherTest {
BlockingSourceContext<String> sourceContext = new BlockingSourceContext<>();
Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
Collections.singletonMap(new KafkaTopicPartition(topic, partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
KafkaDeserializationSchema<String> schema = new KafkaDeserializationSchemaWrapper<>(new SimpleStringSchema());
final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
sourceContext,
......
......@@ -49,7 +49,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
......@@ -119,7 +118,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
private final KafkaTopicsDescriptor topicsDescriptor;
/** The schema to convert between Kafka's byte messages, and Flink's objects. */
protected final KeyedDeserializationSchema<T> deserializer;
protected final KafkaDeserializationSchema<T> deserializer;
/** The set of topic partitions that the source will read, with their initial offsets to start reading from. */
private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
......@@ -236,7 +235,7 @@ public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFuncti
public FlinkKafkaConsumerBase(
List<String> topics,
Pattern topicPattern,
KeyedDeserializationSchema<T> deserializer,
KafkaDeserializationSchema<T> deserializer,
long discoveryIntervalMillis,
boolean useMetrics) {
this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.io.Serializable;
/**
* The deserialization schema describes how to turn the Kafka ConsumerRecords
* into data types (Java/Scala objects) that are processed by Flink.
*
* @param <T> The type created by the keyed deserialization schema.
*/
@PublicEvolving
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
/**
* Method to decide whether the element signals the end of the stream. If
* true is returned the element won't be emitted.
*
* @param nextElement The element to test for the end-of-stream signal.
*
* @return True, if the element signals end of stream, false otherwise.
*/
boolean isEndOfStream(T nextElement);
/**
* Deserializes the Kafka record.
*
* @param record Kafka record to be deserialized.
*
* @return The deserialized message as an object (null if the message cannot be deserialized).
*/
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
}
......@@ -20,28 +20,29 @@ package org.apache.flink.streaming.util.serialization;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import java.io.IOException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* A simple wrapper for using the DeserializationSchema with the KeyedDeserializationSchema
* A simple wrapper for using the DeserializationSchema with the KafkaDeserializationSchema
* interface.
* @param <T> The type created by the deserialization schema.
*/
@Internal
public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> {
public class KafkaDeserializationSchemaWrapper<T> implements KafkaDeserializationSchema<T> {
private static final long serialVersionUID = 2651665280744549932L;
private final DeserializationSchema<T> deserializationSchema;
public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
public KafkaDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
this.deserializationSchema = deserializationSchema;
}
@Override
public T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
return deserializationSchema.deserialize(message);
public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
return deserializationSchema.deserialize(record.value());
}
@Override
......
......@@ -19,12 +19,13 @@ package org.apache.flink.streaming.util.serialization;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
......@@ -39,7 +40,7 @@ import static org.apache.flink.api.java.typeutils.TypeExtractor.getForClass;
* the "offset" (long), "topic" (String) and "partition" (int).
*/
@PublicEvolving
public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSchema<ObjectNode> {
public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSchema<ObjectNode> {
private static final long serialVersionUID = 1509391548173891955L;
......@@ -51,22 +52,22 @@ public class JSONKeyValueDeserializationSchema implements KeyedDeserializationSc
}
@Override
public ObjectNode deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if (mapper == null) {
mapper = new ObjectMapper();
}
ObjectNode node = mapper.createObjectNode();
if (messageKey != null) {
node.set("key", mapper.readValue(messageKey, JsonNode.class));
if (record.key() != null) {
node.set("key", mapper.readValue(record.key(), JsonNode.class));
}
if (message != null) {
node.set("value", mapper.readValue(message, JsonNode.class));
if (record.value() != null) {
node.set("value", mapper.readValue(record.value(), JsonNode.class));
}
if (includeMetadata) {
node.putObject("metadata")
.put("offset", offset)
.put("topic", topic)
.put("partition", partition);
.put("offset", record.offset())
.put("topic", record.topic())
.put("partition", record.partition());
}
return node;
}
......
......@@ -18,10 +18,11 @@
package org.apache.flink.streaming.util.serialization;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.io.IOException;
import java.io.Serializable;
/**
* The deserialization schema describes how to turn the byte key / value messages delivered by certain
......@@ -29,10 +30,12 @@ import java.io.Serializable;
* processed by Flink.
*
* @param <T> The type created by the keyed deserialization schema.
*
* @deprecated Use {@link KafkaDeserializationSchema}.
*/
@Deprecated
@PublicEvolving
public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
public interface KeyedDeserializationSchema<T> extends KafkaDeserializationSchema<T> {
/**
* Deserializes the byte message.
*
......@@ -45,13 +48,8 @@ public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQ
*/
T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;
/**
* Method to decide whether the element signals the end of the stream. If
* true is returned the element won't be emitted.
*
* @param nextElement The element to test for the end-of-stream signal.
*
* @return True, if the element signals end of stream, false otherwise.
*/
boolean isEndOfStream(T nextElement);
@Override
default T deserialize(ConsumerRecord<byte[], byte[]> record) throws IOException {
return deserialize(record.key(), record.value(), record.topic(), record.partition(), record.offset());
}
}
......@@ -27,6 +27,9 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.io.IOException;
......@@ -38,7 +41,7 @@ import java.io.IOException;
* @param <V> The value type to be serialized.
*/
@Internal
public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K, V>> {
public class TypeInformationKeyValueSerializationSchema<K, V> implements KafkaDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K, V>> {
private static final long serialVersionUID = -5359448468131559102L;
......@@ -96,16 +99,16 @@ public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDe
// ------------------------------------------------------------------------
@Override
public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
public Tuple2<K, V> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
K key = null;
V value = null;
if (messageKey != null) {
inputDeserializer.setBuffer(messageKey);
if (record.key() != null) {
inputDeserializer.setBuffer(record.key());
key = keySerializer.deserialize(inputDeserializer);
}
if (message != null) {
inputDeserializer.setBuffer(message);
if (record.value() != null) {
inputDeserializer.setBuffer(record.value());
value = valueSerializer.deserialize(inputDeserializer);
}
return new Tuple2<>(key, value);
......
......@@ -36,7 +36,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.util.SerializedValue;
......@@ -370,7 +369,7 @@ public class FlinkKafkaConsumerBaseMigrationTest {
super(
Arrays.asList("dummy-topic"),
null,
(KeyedDeserializationSchema< T >) mock(KeyedDeserializationSchema.class),
(KafkaDeserializationSchema< T >) mock(KafkaDeserializationSchema.class),
discoveryInterval,
false);
......
......@@ -51,7 +51,6 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
......@@ -904,7 +903,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
super(
Collections.singletonList("dummy-topic"),
null,
(KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class),
(KafkaDeserializationSchema< T >) mock(KafkaDeserializationSchema.class),
discoveryIntervalMillis,
false);
......@@ -957,7 +956,7 @@ public class FlinkKafkaConsumerBaseTest extends TestLogger {
TestingFlinkKafkaConsumer(final AbstractPartitionDiscoverer partitionDiscoverer, long discoveryIntervalMillis) {
super(Collections.singletonList("dummy-topic"),
null,
(KeyedDeserializationSchema < T >) mock(KeyedDeserializationSchema.class),
(KafkaDeserializationSchema < T >) mock(KafkaDeserializationSchema.class),
discoveryIntervalMillis,
false);
this.partitionDiscoverer = partitionDiscoverer;
......
......@@ -22,17 +22,17 @@ import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserialization
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
/**
* Tests for the{@link JSONKeyValueDeserializationSchema}.
*/
public class JSONKeyValueDeserializationSchemaTest {
@Test
public void testDeserializeWithoutMetadata() throws IOException {
public void testDeserializeWithoutMetadata() throws Exception {
ObjectMapper mapper = new ObjectMapper();
ObjectNode initialKey = mapper.createObjectNode();
initialKey.put("index", 4);
......@@ -43,7 +43,7 @@ public class JSONKeyValueDeserializationSchemaTest {
byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false);
ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "", 0, 0);
ObjectNode deserializedValue = schema.deserialize(newConsumerRecord(serializedKey, serializedValue));
Assert.assertTrue(deserializedValue.get("metadata") == null);
Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt());
......@@ -51,7 +51,7 @@ public class JSONKeyValueDeserializationSchemaTest {
}
@Test
public void testDeserializeWithoutKey() throws IOException {
public void testDeserializeWithoutKey() throws Exception {
ObjectMapper mapper = new ObjectMapper();
byte[] serializedKey = null;
......@@ -60,15 +60,26 @@ public class JSONKeyValueDeserializationSchemaTest {
byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false);
ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "", 0, 0);
ObjectNode deserializedValue = schema.deserialize(newConsumerRecord(serializedKey, serializedValue));
Assert.assertTrue(deserializedValue.get("metadata") == null);
Assert.assertTrue(deserializedValue.get("key") == null);
Assert.assertEquals("world", deserializedValue.get("value").get("word").asText());
}
private static ConsumerRecord<byte[], byte[]> newConsumerRecord(
byte[] serializedKey, byte[] serializedValue) {
return newConsumerRecord("", 0, 0L, serializedKey, serializedValue);
}
private static ConsumerRecord<byte[], byte[]> newConsumerRecord(
String topic, int partition, long offset, byte[] serializedKey, byte[] serializedValue) {
return new ConsumerRecord<>(topic, partition, offset, serializedKey, serializedValue);
}
@Test
public void testDeserializeWithoutValue() throws IOException {
public void testDeserializeWithoutValue() throws Exception {
ObjectMapper mapper = new ObjectMapper();
ObjectNode initialKey = mapper.createObjectNode();
initialKey.put("index", 4);
......@@ -77,7 +88,7 @@ public class JSONKeyValueDeserializationSchemaTest {
byte[] serializedValue = null;
JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(false);
ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "", 0, 0);
ObjectNode deserializedValue = schema.deserialize(newConsumerRecord(serializedKey, serializedValue));
Assert.assertTrue(deserializedValue.get("metadata") == null);
Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt());
......@@ -85,7 +96,7 @@ public class JSONKeyValueDeserializationSchemaTest {
}
@Test
public void testDeserializeWithMetadata() throws IOException {
public void testDeserializeWithMetadata() throws Exception {
ObjectMapper mapper = new ObjectMapper();
ObjectNode initialKey = mapper.createObjectNode();
initialKey.put("index", 4);
......@@ -96,7 +107,9 @@ public class JSONKeyValueDeserializationSchemaTest {
byte[] serializedValue = mapper.writeValueAsBytes(initialValue);
JSONKeyValueDeserializationSchema schema = new JSONKeyValueDeserializationSchema(true);
ObjectNode deserializedValue = schema.deserialize(serializedKey, serializedValue, "topic#1", 3, 4);
final ConsumerRecord<byte[], byte[]> consumerRecord =
newConsumerRecord("topic#1", 3, 4L, serializedKey, serializedValue);
ObjectNode deserializedValue = schema.deserialize(consumerRecord);
Assert.assertEquals(4, deserializedValue.get("key").get("index").asInt());
Assert.assertEquals("world", deserializedValue.get("value").get("word").asText());
......
......@@ -65,8 +65,7 @@ import org.apache.flink.streaming.connectors.kafka.testutils.PartitionValidating
import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper;
import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema;
......@@ -80,6 +79,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import kafka.server.KafkaServer;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.Assert;
......@@ -421,8 +421,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
new KeyedSerializationSchemaWrapper<>(
new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
new KeyedDeserializationSchemaWrapper<>(
final KafkaDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
new KafkaDeserializationSchemaWrapper<>(
new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
// setup and run the latest-consuming job
......@@ -1417,7 +1417,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
env.setRestartStrategy(RestartStrategies.noRestart());
env.getConfig().disableSysoutLogging();
KeyedDeserializationSchema<Tuple2<Long, PojoValue>> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
KafkaDeserializationSchema<Tuple2<Long, PojoValue>> readSchema = new TypeInformationKeyValueSerializationSchema<>(Long.class, PojoValue.class, env.getConfig());
Properties props = new Properties();
props.putAll(standardProps);
......@@ -1865,8 +1865,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
new KeyedSerializationSchemaWrapper<>(
new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
new KeyedDeserializationSchemaWrapper<>(
final KafkaDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
new KafkaDeserializationSchemaWrapper<>(
new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
final int maxNumAttempts = 10;
......@@ -1963,8 +1963,8 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
new KeyedSerializationSchemaWrapper<>(
new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
final KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
new KeyedDeserializationSchemaWrapper<>(
final KafkaDeserializationSchema<Tuple2<Integer, Integer>> deserSchema =
new KafkaDeserializationSchemaWrapper<>(
new TypeInformationSerializationSchema<>(resultType, new ExecutionConfig()));
// -------- Write the append sequence --------
......@@ -2025,7 +2025,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
private boolean validateSequence(
final String topic,
final int parallelism,
KeyedDeserializationSchema<Tuple2<Integer, Integer>> deserSchema,
KafkaDeserializationSchema<Tuple2<Integer, Integer>> deserSchema,
final int totalNumElements) throws Exception {
final StreamExecutionEnvironment readEnv = StreamExecutionEnvironment.getExecutionEnvironment();
......@@ -2194,7 +2194,7 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
}
}
private static class Tuple2WithTopicSchema implements KeyedDeserializationSchema<Tuple3<Integer, Integer, String>>,
private static class Tuple2WithTopicSchema implements KafkaDeserializationSchema<Tuple3<Integer, Integer, String>>,
KeyedSerializationSchema<Tuple3<Integer, Integer, String>> {
private final TypeSerializer<Tuple2<Integer, Integer>> ts;
......@@ -2204,10 +2204,10 @@ public abstract class KafkaConsumerTestBase extends KafkaTestBaseWithFlink {
}
@Override
public Tuple3<Integer, Integer, String> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
public Tuple3<Integer, Integer, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(record.value()));
Tuple2<Integer, Integer> t2 = ts.deserialize(in);
return new Tuple3<>(t2.f0, t2.f1, topic);
return new Tuple3<>(t2.f0, t2.f1, record.topic());
}
@Override
......
......@@ -30,11 +30,11 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.InstantiationUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
......@@ -42,7 +42,6 @@ import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.Properties;
......@@ -188,12 +187,13 @@ public class KafkaShortRetentionTestBase implements Serializable {
kafkaServer.deleteTestTopic(topic);
}
private class NonContinousOffsetsDeserializationSchema implements KeyedDeserializationSchema<String> {
private class NonContinousOffsetsDeserializationSchema implements KafkaDeserializationSchema<String> {
private int numJumps;
long nextExpected = 0;
@Override
public String deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
public String deserialize(ConsumerRecord<byte[], byte[]> record) {
final long offset = record.offset();
if (offset != nextExpected) {
numJumps++;
nextExpected = offset;
......
......@@ -23,8 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import kafka.server.KafkaServer;
......@@ -128,10 +127,10 @@ public abstract class KafkaTestEnvironment {
// -- consumer / producer instances:
public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, DeserializationSchema<T> deserializationSchema, Properties props) {
return getConsumer(topics, new KeyedDeserializationSchemaWrapper<T>(deserializationSchema), props);
return getConsumer(topics, new KafkaDeserializationSchemaWrapper<T>(deserializationSchema), props);
}
public <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, KeyedDeserializationSchema<T> readSchema, Properties props) {
public <T> FlinkKafkaConsumerBase<T> getConsumer(String topic, KafkaDeserializationSchema<T> readSchema, Properties props) {
return getConsumer(Collections.singletonList(topic), readSchema, props);
}
......@@ -139,7 +138,7 @@ public abstract class KafkaTestEnvironment {
return getConsumer(Collections.singletonList(topic), deserializationSchema, props);
}
public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props);
public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KafkaDeserializationSchema<T> readSchema, Properties props);
public abstract <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(
Properties properties,
......
......@@ -30,8 +30,7 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.KafkaDeserializationSchemaWrapper;
import org.apache.flink.util.PropertiesUtil;
import org.apache.flink.util.SerializedValue;
......@@ -105,14 +104,14 @@ public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
/**
* Creates a new Kafka streaming source consumer.
*
* <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
* <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value
* pairs, offsets, and topic names from Kafka.
*
* @param topic The name of the topic that should be consumed.
* @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
* @param props
*/
public FlinkKafkaConsumer(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
public FlinkKafkaConsumer(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(Collections.singletonList(topic), deserializer, props);
}
......@@ -126,7 +125,7 @@ public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
* @param props
*/
public FlinkKafkaConsumer(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props);
this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
}
/**
......@@ -138,7 +137,7 @@ public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
* @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
* @param props
*/
public FlinkKafkaConsumer(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
public FlinkKafkaConsumer(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(topics, null, deserializer, props);
}
......@@ -155,7 +154,7 @@ public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
* @param props
*/
public FlinkKafkaConsumer(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
this(null, subscriptionPattern, new KeyedDeserializationSchemaWrapper<>(valueDeserializer), props);
this(null, subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
}
/**
......@@ -166,21 +165,21 @@ public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {
* {@link FlinkKafkaConsumer#KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS} in the properties), topics
* with names matching the pattern will also be subscribed to as they are created on the fly.
*
* <p>This constructor allows passing a {@see KeyedDeserializationSchema} for reading key/value
* <p>This constructor allows passing a {@see KafkaDeserializationSchema} for reading key/value
* pairs, offsets, and topic names from Kafka.
*
* @param subscriptionPattern The regular expression for a pattern of topic names to subscribe to.
* @param deserializer The keyed de-/serializer used to convert between Kafka's byte messages and Flink's objects.
* @param props
*/
public FlinkKafkaConsumer(Pattern subscriptionPattern, KeyedDeserializationSchema<T> deserializer, Properties props) {
public FlinkKafkaConsumer(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(null, subscriptionPattern, deserializer, props);
}
private FlinkKafkaConsumer(
List<String> topics,
Pattern subscriptionPattern,
KeyedDeserializationSchema<T> deserializer,
KafkaDeserializationSchema<T> deserializer,
Properties props) {
super(
......
......@@ -22,12 +22,12 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerRecord;
......@@ -59,7 +59,7 @@ public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
// ------------------------------------------------------------------------
/** The schema to convert between Kafka's byte messages, and Flink's objects. */
private final KeyedDeserializationSchema<T> deserializer;
private final KafkaDeserializationSchema<T> deserializer;
/** The handover of data and exceptions between the consumer thread and the task thread. */
private final Handover handover;
......@@ -81,7 +81,7 @@ public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
long autoWatermarkInterval,
ClassLoader userCodeClassLoader,
String taskNameWithSubtasks,
KeyedDeserializationSchema<T> deserializer,
KafkaDeserializationSchema<T> deserializer,
Properties kafkaProperties,
long pollTimeout,
MetricGroup subtaskMetricGroup,
......@@ -137,9 +137,7 @@ public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {
records.records(partition.getKafkaPartitionHandle());
for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
final T value = deserializer.deserialize(
record.key(), record.value(),
record.topic(), record.partition(), record.offset());
final T value = deserializer.deserialize(record);
if (deserializer.isEndOfStream(value)) {
// end of stream signaled
......
......@@ -36,9 +36,9 @@ import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.junit.BeforeClass;
import org.junit.Test;
......@@ -319,7 +319,7 @@ public class KafkaITCase extends KafkaConsumerTestBase {
}
}
private static class LimitedLongDeserializer implements KeyedDeserializationSchema<Long> {
private static class LimitedLongDeserializer implements KafkaDeserializationSchema<Long> {
private static final long serialVersionUID = 6966177118923713521L;
private final TypeInformation<Long> ti;
......@@ -337,9 +337,9 @@ public class KafkaITCase extends KafkaConsumerTestBase {
}
@Override
public Long deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
public Long deserialize(ConsumerRecord<byte[], byte[]> record) throws IOException {
cnt++;
DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(message));
DataInputView in = new DataInputViewStreamWrapper(new ByteArrayInputStream(record.value()));
Long e = ser.deserialize(in);
return e;
}
......@@ -349,5 +349,4 @@ public class KafkaITCase extends KafkaConsumerTestBase {
return cnt > 1110L;
}
}
}
......@@ -23,7 +23,6 @@ import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
......@@ -227,7 +226,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
}
@Override
public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KafkaDeserializationSchema<T> readSchema, Properties props) {
return new FlinkKafkaConsumer<T>(topics, readSchema, props);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册