[FLINK-11334] Fix migration check in ScalaEnumSerializerSnapshot

First, this inverts the if statements to make the code more readable
and maintainable.

Then, this changes the check to report *incompatible* when the order
of enums changes or when enums are removed because we currently don't
maintain a map of enum ids to enum values ourselves and therefore cannot
migrate or reconfigure the serializer to read data that was serialized
with the old mapping.
上级 ed2a182e
......@@ -79,32 +79,29 @@ class ScalaEnumSerializerSnapshot[E <: Enumeration]
override def resolveSchemaCompatibility(
newSerializer: TypeSerializer[E#Value]): TypeSerializerSchemaCompatibility[E#Value] = {
newSerializer match {
case newEnumSerializer: EnumValueSerializer[_] => {
if (enumClass.equals(newEnumSerializer.enum.getClass)) {
for ((previousEnumConstant, idx) <- previousEnumConstants) {
val enumValue = try {
newEnumSerializer.enum(idx)
} catch {
case _: NoSuchElementException =>
// couldn't find an enum value for the given index
return TypeSerializerSchemaCompatibility.incompatible()
}
if (!previousEnumConstant.equals(enumValue.toString)) {
// compatible only if new enum constants are only appended,
// and original constants must be in the exact same order
return TypeSerializerSchemaCompatibility.compatibleAfterMigration()
}
}
TypeSerializerSchemaCompatibility.compatibleAsIs()
} else {
TypeSerializerSchemaCompatibility.incompatible()
if (!newSerializer.isInstanceOf[EnumValueSerializer[E]]) {
return TypeSerializerSchemaCompatibility.incompatible()
}
val newEnumSerializer = newSerializer.asInstanceOf[EnumValueSerializer[E]]
if (!enumClass.equals(newEnumSerializer.enum.getClass)) {
return TypeSerializerSchemaCompatibility.incompatible()
}
for ((previousEnumName, index) <- previousEnumConstants) {
try {
val newEnumName = newEnumSerializer.enum(index).toString
if (previousEnumName != newEnumName) {
return TypeSerializerSchemaCompatibility.incompatible()
}
} catch {
case _: NoSuchElementException =>
return TypeSerializerSchemaCompatibility.incompatible()
}
case _ => TypeSerializerSchemaCompatibility.incompatible()
}
TypeSerializerSchemaCompatibility.compatibleAsIs()
}
}
......
......@@ -96,19 +96,19 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
}
/**
* Check that removing enum fields requires migration
* Check that removing enum fields makes the snapshot incompatible.
*/
@Test
def checkRemovedField(): Unit = {
assertTrue(checkCompatibility(enumA, enumC).isCompatibleAfterMigration)
assertTrue(checkCompatibility(enumA, enumC).isIncompatible)
}
/**
* Check that changing the enum field order requires migration
* Check that changing the enum field order makes the snapshot incompatible.
*/
@Test
def checkDifferentFieldOrder(): Unit = {
assertTrue(checkCompatibility(enumA, enumD).isCompatibleAfterMigration)
assertTrue(checkCompatibility(enumA, enumD).isIncompatible)
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册