提交 4f8b9bb5 编写于 作者: T Till Rohrmann

[FLINK-3313] [kafka] Fix TypeInformationSerializationSchema usage in LegacyFetcher

The LegacyFetcher used the given KeyedDeserializationSchema across multiple threads even though
it is not thread-safe. This commit fixes the problem by cloning the KeyedDeserializationSchema
before giving it to the SimpleConsumerThread.

Add clone method for Java serializable objects to InstantiationUtil

This closes #1577.
上级 95f9bc82
......@@ -30,6 +30,7 @@ import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.ObjectStreamClass;
import java.io.Serializable;
import java.lang.reflect.Constructor;
import java.lang.reflect.Modifier;
import java.util.HashMap;
......@@ -298,6 +299,20 @@ public final class InstantiationUtil {
return baos.toByteArray();
}
}
/**
* Clones the given serializable object using Java serialization.
*
* @param obj Object to clone
* @param <T> Type of the object to clone
* @return Cloned object
* @throws IOException
* @throws ClassNotFoundException
*/
public static <T extends Serializable> T clone(T obj) throws IOException, ClassNotFoundException {
final byte[] serializedObject = serializeObject(obj);
return deserializeObject(serializedObject, obj.getClass().getClassLoader());
}
// --------------------------------------------------------------------------------------------
......
......@@ -32,6 +32,7 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.Node;
......@@ -199,8 +200,10 @@ public class LegacyFetcher implements Fetcher {
FetchPartition[] partitions = partitionsList.toArray(new FetchPartition[partitionsList.size()]);
final KeyedDeserializationSchema<T> clonedDeserializer = InstantiationUtil.clone(deserializer);
SimpleConsumerThread<T> thread = new SimpleConsumerThread<>(this, config,
broker, partitions, sourceContext, deserializer, lastOffsets);
broker, partitions, sourceContext, clonedDeserializer, lastOffsets);
thread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
taskName, broker.id(), broker.host(), broker.port()));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册