提交 8fddae8d 编写于 作者: U Ufuk Celebi

[FLINK-5484] [serialization] Add test for registered Kryo types

上级 8f4139a4
0,int
1,java.lang.String
2,float
3,boolean
4,byte
5,char
6,short
7,long
8,double
9,void
10,scala.collection.convert.Wrappers$SeqWrapper
11,scala.collection.convert.Wrappers$IteratorWrapper
12,scala.collection.convert.Wrappers$MapWrapper
13,scala.collection.convert.Wrappers$JListWrapper
14,scala.collection.convert.Wrappers$JMapWrapper
15,scala.Some
16,scala.util.Left
17,scala.util.Right
18,scala.collection.immutable.Vector
19,scala.collection.immutable.Set$Set1
20,scala.collection.immutable.Set$Set2
21,scala.collection.immutable.Set$Set3
22,scala.collection.immutable.Set$Set4
23,scala.collection.immutable.HashSet$HashTrieSet
24,scala.collection.immutable.Map$Map1
25,scala.collection.immutable.Map$Map2
26,scala.collection.immutable.Map$Map3
27,scala.collection.immutable.Map$Map4
28,scala.collection.immutable.HashMap$HashTrieMap
29,scala.collection.immutable.Range$Inclusive
30,scala.collection.immutable.NumericRange$Inclusive
31,scala.collection.immutable.NumericRange$Exclusive
32,scala.collection.mutable.BitSet
33,scala.collection.mutable.HashMap
34,scala.collection.mutable.HashSet
35,scala.collection.convert.Wrappers$IterableWrapper
36,scala.Tuple1
37,scala.Tuple2
38,scala.Tuple3
39,scala.Tuple4
40,scala.Tuple5
41,scala.Tuple6
42,scala.Tuple7
43,scala.Tuple8
44,scala.Tuple9
45,scala.Tuple10
46,scala.Tuple11
47,scala.Tuple12
48,scala.Tuple13
49,scala.Tuple14
50,scala.Tuple15
51,scala.Tuple16
52,scala.Tuple17
53,scala.Tuple18
54,scala.Tuple19
55,scala.Tuple20
56,scala.Tuple21
57,scala.Tuple22
58,scala.Tuple1$mcJ$sp
59,scala.Tuple1$mcI$sp
60,scala.Tuple1$mcD$sp
61,scala.Tuple2$mcJJ$sp
62,scala.Tuple2$mcJI$sp
63,scala.Tuple2$mcJD$sp
64,scala.Tuple2$mcIJ$sp
65,scala.Tuple2$mcII$sp
66,scala.Tuple2$mcID$sp
67,scala.Tuple2$mcDJ$sp
68,scala.Tuple2$mcDI$sp
69,scala.Tuple2$mcDD$sp
70,scala.Symbol
71,scala.reflect.ClassTag
72,scala.runtime.BoxedUnit
73,java.util.Arrays$ArrayList
74,java.util.BitSet
75,java.util.PriorityQueue
76,java.util.regex.Pattern
77,java.sql.Date
78,java.sql.Time
79,java.sql.Timestamp
80,java.net.URI
81,java.net.InetSocketAddress
82,java.util.UUID
83,java.util.Locale
84,java.text.SimpleDateFormat
85,org.apache.avro.generic.GenericData$Array
......@@ -17,17 +17,19 @@
*/
package org.apache.flink.api.scala.runtime
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.esotericsoftware.kryo.io.{Input, Output}
import java.io._
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeutils.SerializerTestInstance
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.joda.time.LocalDate
import org.junit.Test
import scala.collection.mutable
import scala.io.Source
import scala.reflect._
class KryoGenericTypeSerializerTest {
......@@ -146,6 +148,96 @@ class KryoGenericTypeSerializerTest {
runTests(list)
}
/**
* Tests that the registered classes in Kryo did not change.
*
* Once we have proper serializer versioning this test will become obsolete.
* But currently a change in the serializers can break savepoint backwards
* compatability between Flink versions.
*/
@Test
def testDefaultKryoRegisteredClassesDidNotChange(): Unit = {
// Previous registration (id => registered class (Class#getName))
val previousRegistrations: mutable.HashMap[Int, String] = mutable.HashMap[Int, String]()
val stream = Thread.currentThread().getContextClassLoader()
.getResourceAsStream("flink_11-kryo_registrations")
Source.fromInputStream(stream).getLines().foreach{
line =>
val Array(id, registeredClass) = line.split(",")
previousRegistrations.put(id.toInt, registeredClass)
}
// Get Kryo and verify that the registered IDs and types in
// Kryo have not changed compared to the provided registrations
// file.
val kryo = new KryoSerializer[Integer](classOf[Integer], new ExecutionConfig()).getKryo
val nextId = kryo.getNextRegistrationId
for (i <- 0 until nextId) {
val registration = kryo.getRegistration(i)
previousRegistrations.get(registration.getId) match {
case None => throw new IllegalStateException(s"Expected no entry with ID " +
s"${registration.getId}, but got one for type ${registration.getType.getName}. This " +
s"can lead to registered user types being deserialized with the wrong serializer when " +
s"restoring a savepoint.")
case Some(registeredClass) =>
if (registeredClass != registration.getType.getName) {
throw new IllegalStateException(s"Expected type ${registration.getType.getName} with " +
s"ID ${registration.getId}, but got $registeredClass.")
}
}
}
// Verify number of registrations (required to check if current number of
// registrations is less than before).
if (previousRegistrations.size != nextId) {
throw new IllegalStateException(s"Number of registered classes changed (previously " +
s"${previousRegistrations.size}, but now $nextId). This can lead to registered user " +
s"types being deserialized with the wrong serializer when restoring a savepoint.")
}
}
/**
* Creates a Kryo serializer and writes the default registrations out to a
* comma separated file with one entry per line:
*
* id,class
*
* The produced file is used to check that the registered IDs don't change
* in future Flink versions.
*
* This method is not used in the tests, but documents how the test file
* has been created and can be used to re-create it if needed.
*
* @param filePath File path to write registrations to
*/
private def writeDefaultKryoRegistrations(filePath: String) = {
val file = new File(filePath)
if (file.exists()) {
file.delete()
}
val writer = new BufferedWriter(new FileWriter(file))
try {
val kryo = new KryoSerializer[Integer](classOf[Integer], new ExecutionConfig()).getKryo
val nextId = kryo.getNextRegistrationId
for (i <- 0 until nextId) {
val registration = kryo.getRegistration(i)
val str = registration.getId + "," + registration.getType.getName
writer.write(str, 0, str.length)
writer.newLine()
}
println(s"Created file with registrations at $file.")
} finally {
writer.close()
}
}
case class ComplexType(id: String, number: Int, values: List[Int]){
override def equals(obj: Any): Boolean ={
if(obj != null && obj.isInstanceOf[ComplexType]){
......
......@@ -881,6 +881,7 @@ under the License.
<!-- Test Data. -->
<exclude>flink-tests/src/test/resources/testdata/terainput.txt</exclude>
<exclude>flink-tests/src/test/resources/flink_11-kryo_registrations</exclude>
<exclude>flink-connectors/flink-avro/src/test/resources/avro/*.avsc</exclude>
<exclude>out/test/flink-avro/avro/user.avsc</exclude>
<exclude>flink-libraries/flink-table/src/test/scala/resources/*.out</exclude>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册