提交 2eb2a0ef 编写于 作者: S Stephan Ewen

[FLINK-3338] [kafka] Use proper classloader when cloning the deserialization schema.

This closes #1590
上级 fe0c3b53
...@@ -310,8 +310,33 @@ public final class InstantiationUtil { ...@@ -310,8 +310,33 @@ public final class InstantiationUtil {
* @throws ClassNotFoundException * @throws ClassNotFoundException
*/ */
public static <T extends Serializable> T clone(T obj) throws IOException, ClassNotFoundException { public static <T extends Serializable> T clone(T obj) throws IOException, ClassNotFoundException {
if (obj == null) {
return null;
} else {
return clone(obj, obj.getClass().getClassLoader());
}
}
/**
* Clones the given serializable object using Java serialization, using the given classloader to
* resolve the cloned classes.
*
* @param obj Object to clone
* @param classLoader The classloader to resolve the classes during deserialization.
* @param <T> Type of the object to clone
*
* @return Cloned object
*
* @throws IOException
* @throws ClassNotFoundException
*/
public static <T extends Serializable> T clone(T obj, ClassLoader classLoader) throws IOException, ClassNotFoundException {
if (obj == null) {
return null;
} else {
final byte[] serializedObject = serializeObject(obj); final byte[] serializedObject = serializeObject(obj);
return deserializeObject(serializedObject, obj.getClass().getClassLoader()); return deserializeObject(serializedObject, classLoader);
}
} }
// -------------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------------
......
...@@ -251,7 +251,8 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { ...@@ -251,7 +251,8 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> {
} }
// create fetcher // create fetcher
fetcher = new LegacyFetcher(this.subscribedPartitions, props, getRuntimeContext().getTaskName()); fetcher = new LegacyFetcher(this.subscribedPartitions, props,
getRuntimeContext().getTaskName(), getRuntimeContext().getUserCodeClassLoader());
// offset handling // offset handling
offsetHandler = new ZookeeperOffsetHandler(props); offsetHandler = new ZookeeperOffsetHandler(props);
......
...@@ -31,11 +31,12 @@ import kafka.message.MessageAndOffset; ...@@ -31,11 +31,12 @@ import kafka.message.MessageAndOffset;
import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.StringUtils; import org.apache.flink.util.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Node; import org.apache.kafka.common.Node;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -49,7 +50,7 @@ import java.util.Map; ...@@ -49,7 +50,7 @@ import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static com.google.common.base.Preconditions.checkNotNull; import static java.util.Objects.requireNonNull;
/** /**
* This fetcher uses Kafka's low-level API to pull data from a specific * This fetcher uses Kafka's low-level API to pull data from a specific
...@@ -71,6 +72,9 @@ public class LegacyFetcher implements Fetcher { ...@@ -71,6 +72,9 @@ public class LegacyFetcher implements Fetcher {
/** The first error that occurred in a connection thread */ /** The first error that occurred in a connection thread */
private final AtomicReference<Throwable> error; private final AtomicReference<Throwable> error;
/** The classloader for dynamically loaded classes */
private final ClassLoader userCodeClassloader;
/** The partitions that the fetcher should read, with their starting offsets */ /** The partitions that the fetcher should read, with their starting offsets */
private Map<KafkaTopicPartitionLeader, Long> partitionsToRead; private Map<KafkaTopicPartitionLeader, Long> partitionsToRead;
...@@ -86,8 +90,13 @@ public class LegacyFetcher implements Fetcher { ...@@ -86,8 +90,13 @@ public class LegacyFetcher implements Fetcher {
/** Flag to shot the fetcher down */ /** Flag to shot the fetcher down */
private volatile boolean running = true; private volatile boolean running = true;
public LegacyFetcher(List<KafkaTopicPartitionLeader> partitions, Properties props, String taskName) { public LegacyFetcher(
this.config = checkNotNull(props, "The config properties cannot be null"); List<KafkaTopicPartitionLeader> partitions, Properties props,
String taskName, ClassLoader userCodeClassloader) {
this.config = requireNonNull(props, "The config properties cannot be null");
this.userCodeClassloader = requireNonNull(userCodeClassloader);
//this.topic = checkNotNull(topic, "The topic cannot be null"); //this.topic = checkNotNull(topic, "The topic cannot be null");
this.partitionsToRead = new HashMap<>(); this.partitionsToRead = new HashMap<>();
for (KafkaTopicPartitionLeader p: partitions) { for (KafkaTopicPartitionLeader p: partitions) {
...@@ -200,7 +209,8 @@ public class LegacyFetcher implements Fetcher { ...@@ -200,7 +209,8 @@ public class LegacyFetcher implements Fetcher {
FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]); FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]);
final KeyedDeserializationSchema<T> clonedDeserializer = InstantiationUtil.clone(deserializer); final KeyedDeserializationSchema<T> clonedDeserializer =
InstantiationUtil.clone(deserializer, userCodeClassloader);
SimpleConsumerThread<T> thread = new SimpleConsumerThread<>(this, config, SimpleConsumerThread<T> thread = new SimpleConsumerThread<>(this, config,
broker, partitions, sourceContext, clonedDeserializer, lastOffsets); broker, partitions, sourceContext, clonedDeserializer, lastOffsets);
...@@ -344,9 +354,9 @@ public class LegacyFetcher implements Fetcher { ...@@ -344,9 +354,9 @@ public class LegacyFetcher implements Fetcher {
this.config = config; this.config = config;
this.broker = broker; this.broker = broker;
this.partitions = partitions; this.partitions = partitions;
this.sourceContext = checkNotNull(sourceContext); this.sourceContext = requireNonNull(sourceContext);
this.deserializer = checkNotNull(deserializer); this.deserializer = requireNonNull(deserializer);
this.offsetsState = checkNotNull(offsetsState); this.offsetsState = requireNonNull(offsetsState);
} }
@Override @Override
......
...@@ -98,7 +98,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext { ...@@ -98,7 +98,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
@Override @Override
public ClassLoader getUserCodeClassLoader() { public ClassLoader getUserCodeClassLoader() {
throw new UnsupportedOperationException(); return getClass().getClassLoader();
} }
@Override @Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册