From 2eb2a0ef352f75a65a45a5a247450ae61ae5ab17 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Thu, 4 Feb 2016 21:14:39 +0100 Subject: [PATCH] [FLINK-3338] [kafka] Use proper classloader when cloning the deserialization schema. This closes #1590 --- .../apache/flink/util/InstantiationUtil.java | 29 +++++++++++++++++-- .../kafka/FlinkKafkaConsumer08.java | 3 +- .../kafka/internals/LegacyFetcher.java | 26 ++++++++++++----- .../kafka/testutils/MockRuntimeContext.java | 2 +- 4 files changed, 48 insertions(+), 12 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java index 1c6896f4d59..e2439ca0985 100644 --- a/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java +++ b/flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java @@ -310,8 +310,33 @@ public final class InstantiationUtil { * @throws ClassNotFoundException */ public static T clone(T obj) throws IOException, ClassNotFoundException { - final byte[] serializedObject = serializeObject(obj); - return deserializeObject(serializedObject, obj.getClass().getClassLoader()); + if (obj == null) { + return null; + } else { + return clone(obj, obj.getClass().getClassLoader()); + } + } + + /** + * Clones the given serializable object using Java serialization, using the given classloader to + * resolve the cloned classes. + * + * @param obj Object to clone + * @param classLoader The classloader to resolve the classes during deserialization. + * @param Type of the object to clone + * + * @return Cloned object + * + * @throws IOException + * @throws ClassNotFoundException + */ + public static T clone(T obj, ClassLoader classLoader) throws IOException, ClassNotFoundException { + if (obj == null) { + return null; + } else { + final byte[] serializedObject = serializeObject(obj); + return deserializeObject(serializedObject, classLoader); + } } // -------------------------------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java index 543e0ff1a5e..bdea37f6019 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -251,7 +251,8 @@ public class FlinkKafkaConsumer08 extends FlinkKafkaConsumerBase { } // create fetcher - fetcher = new LegacyFetcher(this.subscribedPartitions, props, getRuntimeContext().getTaskName()); + fetcher = new LegacyFetcher(this.subscribedPartitions, props, + getRuntimeContext().getTaskName(), getRuntimeContext().getUserCodeClassLoader()); // offset handling offsetHandler = new ZookeeperOffsetHandler(props); diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java index 164cbac9829..fe7f77791cd 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java @@ -31,11 +31,12 @@ import kafka.message.MessageAndOffset; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; - import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.StringUtils; + import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.Node; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +50,7 @@ import java.util.Map; import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; -import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; /** * This fetcher uses Kafka's low-level API to pull data from a specific @@ -70,6 +71,9 @@ public class LegacyFetcher implements Fetcher { /** The first error that occurred in a connection thread */ private final AtomicReference error; + + /** The classloader for dynamically loaded classes */ + private final ClassLoader userCodeClassloader; /** The partitions that the fetcher should read, with their starting offsets */ private Map partitionsToRead; @@ -86,8 +90,13 @@ public class LegacyFetcher implements Fetcher { /** Flag to shot the fetcher down */ private volatile boolean running = true; - public LegacyFetcher(List partitions, Properties props, String taskName) { - this.config = checkNotNull(props, "The config properties cannot be null"); + public LegacyFetcher( + List partitions, Properties props, + String taskName, ClassLoader userCodeClassloader) { + + this.config = requireNonNull(props, "The config properties cannot be null"); + this.userCodeClassloader = requireNonNull(userCodeClassloader); + //this.topic = checkNotNull(topic, "The topic cannot be null"); this.partitionsToRead = new HashMap<>(); for (KafkaTopicPartitionLeader p: partitions) { @@ -200,7 +209,8 @@ public class LegacyFetcher implements Fetcher { FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]); - final KeyedDeserializationSchema clonedDeserializer = InstantiationUtil.clone(deserializer); + final KeyedDeserializationSchema clonedDeserializer = + InstantiationUtil.clone(deserializer, userCodeClassloader); SimpleConsumerThread thread = new SimpleConsumerThread<>(this, config, broker, partitions, sourceContext, clonedDeserializer, lastOffsets); @@ -344,9 +354,9 @@ public class LegacyFetcher implements Fetcher { this.config = config; this.broker = broker; this.partitions = partitions; - this.sourceContext = checkNotNull(sourceContext); - this.deserializer = checkNotNull(deserializer); - this.offsetsState = checkNotNull(offsetsState); + this.sourceContext = requireNonNull(sourceContext); + this.deserializer = requireNonNull(deserializer); + this.offsetsState = requireNonNull(offsetsState); } @Override diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java index cd44236ad22..17e2e6ff02c 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java @@ -98,7 +98,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext { @Override public ClassLoader getUserCodeClassLoader() { - throw new UnsupportedOperationException(); + return getClass().getClassLoader(); } @Override -- GitLab