提交 8af6ef49 编写于 作者: A Aljoscha Krettek 提交者: Stephan Ewen

[FLINK-1378] Add support for Throwables in KryoSerializer

上级 935e316a
......@@ -384,6 +384,9 @@ public abstract class SerializerTestBase<T> {
assertArrayEquals(message, (Object[]) should, (Object[]) is);
}
}
else if (should instanceof Throwable) {
assertEquals(((Throwable)should).getMessage(), ((Throwable)is).getMessage());
}
else {
assertEquals(message, should, is);
}
......
......@@ -22,6 +22,7 @@ import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.twitter.chill.ScalaKryoInstantiator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
......@@ -114,7 +115,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
output = new Output(outputStream);
previousOut = target;
}
try {
kryo.writeClassAndObject(output, record);
output.flush();
......@@ -180,6 +181,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = new ScalaKryoInstantiator().newKryo();
this.kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());
this.kryo.setRegistrationRequired(false);
this.kryo.register(type);
this.kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
......
......@@ -25,6 +25,13 @@ import scala.reflect._
class KryoGenericTypeSerializerTest {
@Test
def testThrowableSerialization: Unit = {
val a = List(new RuntimeException("Hello"), new RuntimeException("there"))
runTests(a)
}
@Test
def testScalaListSerialization: Unit = {
val a = List(42,1,49,1337)
......@@ -43,14 +50,14 @@ class KryoGenericTypeSerializerTest {
def testScalaMapSerialization: Unit = {
val a = Map(("1" -> 1), ("2" -> 2), ("42" -> 42), ("1337" -> 1337))
runTests(a)
runTests(Seq(a))
}
@Test
def testMutableMapSerialization: Unit ={
val a = scala.collection.mutable.Map((1 -> "1"), (2 -> "2"), (3 -> "3"))
runTests(a)
runTests(Seq(a))
}
@Test
......@@ -115,7 +122,7 @@ class KryoGenericTypeSerializerTest {
}
}
def runTests[T : ClassTag](objects: T *): Unit ={
def runTests[T : ClassTag](objects: Seq[T]): Unit ={
val clsTag = classTag[T]
val typeInfo = new GenericTypeInfo[T](clsTag.runtimeClass.asInstanceOf[Class[T]])
val serializer = typeInfo.createSerializer()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册