diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 728cc26d7a8025b4f476484ce99766be8907f5f1..1c6896f4d5991a9e8f49df78e53dd58c93fbfe64 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -30,6 +30,7 @@ import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.ObjectStreamClass; +import java.io.Serializable; import java.lang.reflect.Constructor; import java.lang.reflect.Modifier; import java.util.HashMap; @@ -298,6 +299,20 @@ public final class InstantiationUtil { return baos.toByteArray(); } } + + /** + * Clones the given serializable object using Java serialization. + * + * @param obj Object to clone + * @param Type of the object to clone + * @return Cloned object + * @throws IOException + * @throws ClassNotFoundException + */ + public static T clone(T obj) throws IOException, ClassNotFoundException { + final byte[] serializedObject = serializeObject(obj); + return deserializeObject(serializedObject, obj.getClass().getClassLoader()); + } // -------------------------------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java index 9fec52d3e9671808204688ebe3e65dfdab565878..164cbac9829ccca165d873757e8f5fd5513bf33c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java @@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.StringUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.Node; @@ -199,8 +200,10 @@ public class LegacyFetcher implements Fetcher { FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]); + final KeyedDeserializationSchema clonedDeserializer = InstantiationUtil.clone(deserializer); + SimpleConsumerThread thread = new SimpleConsumerThread<>(this, config, - broker, partitions, sourceContext, deserializer, lastOffsets); + broker, partitions, sourceContext, clonedDeserializer, lastOffsets); thread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)", taskName, broker.id(), broker.host(), broker.port()));