diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java index ef66403c2a727956b2f533cf4f92c75ad58b4699..a45508d73b3d0a6afea7975e78ae0b35b4b62ed6 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java @@ -29,9 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -43,13 +41,9 @@ import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Date; -import java.util.HashMap; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Random; @@ -347,12 +341,14 @@ public class PojoSerializerTest extends SerializerTestBase compatResult = pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer); - assertTrue(compatResult.isCompatibleAsIs()); + assertTrue(compatResult.isCompatibleWithReconfiguredSerializer()); + assertTrue(compatResult.getReconfiguredSerializer() instanceof PojoSerializer); // reconfigure - check reconfiguration result and that registration ids remains the same //assertEquals(ReconfigureResult.COMPATIBLE, pojoSerializer.reconfigure(pojoSerializerConfigSnapshot)); - assertEquals(subClassATag, pojoSerializer.getRegisteredClasses().get(SubTestUserClassA.class).intValue()); - assertEquals(subClassBTag, pojoSerializer.getRegisteredClasses().get(SubTestUserClassB.class).intValue()); + PojoSerializer reconfiguredPojoSerializer = (PojoSerializer) compatResult.getReconfiguredSerializer(); + assertEquals(subClassATag, reconfiguredPojoSerializer.getRegisteredClasses().get(SubTestUserClassA.class).intValue()); + assertEquals(subClassBTag, reconfiguredPojoSerializer.getRegisteredClasses().get(SubTestUserClassB.class).intValue()); } /** @@ -394,10 +390,13 @@ public class PojoSerializerTest extends SerializerTestBase compatResult = pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer); - assertTrue(compatResult.isCompatibleAsIs()); - assertEquals(2, pojoSerializer.getSubclassSerializerCache().size()); - assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class)); - assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class)); + assertTrue(compatResult.isCompatibleWithReconfiguredSerializer()); + assertTrue(compatResult.getReconfiguredSerializer() instanceof PojoSerializer); + + PojoSerializer reconfiguredPojoSerializer = (PojoSerializer) compatResult.getReconfiguredSerializer(); + assertEquals(2, reconfiguredPojoSerializer.getSubclassSerializerCache().size()); + assertTrue(reconfiguredPojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class)); + assertTrue(reconfiguredPojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class)); } /** @@ -459,127 +458,15 @@ public class PojoSerializerTest extends SerializerTestBase compatResult = pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer); - assertTrue(compatResult.isCompatibleAsIs()); - assertEquals(2, pojoSerializer.getSubclassSerializerCache().size()); - assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class)); - assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class)); - assertEquals(2, pojoSerializer.getRegisteredClasses().size()); - assertTrue(pojoSerializer.getRegisteredClasses().containsKey(SubTestUserClassA.class)); - assertTrue(pojoSerializer.getRegisteredClasses().containsKey(SubTestUserClassB.class)); - } - - /** - * Verifies that reconfiguration reorders the fields of the new Pojo serializer to remain the same. - */ - @Test - public void testReconfigureWithDifferentFieldOrder() throws Exception { - Field[] mockOriginalFieldOrder = { - TestUserClass.class.getField("dumm4"), - TestUserClass.class.getField("dumm3"), - TestUserClass.class.getField("nestedClass"), - TestUserClass.class.getField("dumm1"), - TestUserClass.class.getField("dumm2"), - TestUserClass.class.getField("dumm5"), - }; - - // creating this serializer just for generating config snapshots of the field serializers - PojoSerializer ser = (PojoSerializer) type.createSerializer(new ExecutionConfig()); - - LinkedHashMap, TypeSerializerSnapshot>> mockOriginalFieldToSerializerConfigSnapshot = - new LinkedHashMap<>(mockOriginalFieldOrder.length); - mockOriginalFieldToSerializerConfigSnapshot.put( - mockOriginalFieldOrder[0].getName(), - new Tuple2<>( - ser.getFieldSerializers()[3], - ser.getFieldSerializers()[3].snapshotConfiguration())); - mockOriginalFieldToSerializerConfigSnapshot.put( - mockOriginalFieldOrder[1].getName(), - new Tuple2<>( - ser.getFieldSerializers()[2], - ser.getFieldSerializers()[2].snapshotConfiguration())); - mockOriginalFieldToSerializerConfigSnapshot.put( - mockOriginalFieldOrder[2].getName(), - new Tuple2<>( - ser.getFieldSerializers()[5], - ser.getFieldSerializers()[5].snapshotConfiguration())); - mockOriginalFieldToSerializerConfigSnapshot.put( - mockOriginalFieldOrder[3].getName(), - new Tuple2<>( - ser.getFieldSerializers()[0], - ser.getFieldSerializers()[0].snapshotConfiguration())); - mockOriginalFieldToSerializerConfigSnapshot.put( - mockOriginalFieldOrder[4].getName(), - new Tuple2<>( - ser.getFieldSerializers()[1], - ser.getFieldSerializers()[1].snapshotConfiguration())); - mockOriginalFieldToSerializerConfigSnapshot.put( - mockOriginalFieldOrder[5].getName(), - new Tuple2<>( - ser.getFieldSerializers()[4], - ser.getFieldSerializers()[4].snapshotConfiguration())); - - PojoSerializer pojoSerializer = (PojoSerializer) type.createSerializer(new ExecutionConfig()); - - assertEquals(TestUserClass.class.getField("dumm1"), pojoSerializer.getFields()[0]); - assertEquals(TestUserClass.class.getField("dumm2"), pojoSerializer.getFields()[1]); - assertEquals(TestUserClass.class.getField("dumm3"), pojoSerializer.getFields()[2]); - assertEquals(TestUserClass.class.getField("dumm4"), pojoSerializer.getFields()[3]); - assertEquals(TestUserClass.class.getField("dumm5"), pojoSerializer.getFields()[4]); - assertEquals(TestUserClass.class.getField("nestedClass"), pojoSerializer.getFields()[5]); - - PojoSerializer.PojoSerializerConfigSnapshot mockPreviousConfigSnapshot = - new PojoSerializer.PojoSerializerConfigSnapshot<>( - TestUserClass.class, - mockOriginalFieldToSerializerConfigSnapshot, // this mocks the previous field order - new LinkedHashMap<>(), // empty; irrelevant for this test - new HashMap<>()); // empty; irrelevant for this test - - // reconfigure - check reconfiguration result and that fields are reordered to the previous order - TypeSerializerSchemaCompatibility compatResult = - mockPreviousConfigSnapshot.resolveSchemaCompatibility(pojoSerializer); - assertTrue(compatResult.isCompatibleAsIs()); - int i = 0; - for (Field field : mockOriginalFieldOrder) { - assertEquals(field, pojoSerializer.getFields()[i]); - i++; - } - } - - private static void verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure( - PojoSerializer.PojoSerializerConfigSnapshot original, - PojoSerializer.PojoSerializerConfigSnapshot deserializedConfig) { - - LinkedHashMap, TypeSerializerSnapshot>> originalFieldSerializersAndConfs = - original.getFieldToSerializerConfigSnapshot(); - for (Map.Entry, TypeSerializerSnapshot>> entry - : deserializedConfig.getFieldToSerializerConfigSnapshot().entrySet()) { - - Assert.assertTrue(entry.getValue().f0 instanceof UnloadableDummyTypeSerializer); - - if (entry.getValue().f1 instanceof PojoSerializer.PojoSerializerConfigSnapshot) { - verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure( - (PojoSerializer.PojoSerializerConfigSnapshot) originalFieldSerializersAndConfs.get(entry.getKey()).f1, - (PojoSerializer.PojoSerializerConfigSnapshot) entry.getValue().f1); - } else { - Assert.assertEquals(originalFieldSerializersAndConfs.get(entry.getKey()).f1, entry.getValue().f1); - } - } - - LinkedHashMap, Tuple2, TypeSerializerSnapshot>> originalRegistrations = - original.getRegisteredSubclassesToSerializerConfigSnapshots(); - - for (Map.Entry, Tuple2, TypeSerializerSnapshot>> entry - : deserializedConfig.getRegisteredSubclassesToSerializerConfigSnapshots().entrySet()) { - - Assert.assertTrue(entry.getValue().f0 instanceof UnloadableDummyTypeSerializer); - - if (entry.getValue().f1 instanceof PojoSerializer.PojoSerializerConfigSnapshot) { - verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure( - (PojoSerializer.PojoSerializerConfigSnapshot) originalRegistrations.get(entry.getKey()).f1, - (PojoSerializer.PojoSerializerConfigSnapshot) entry.getValue().f1); - } else { - Assert.assertEquals(originalRegistrations.get(entry.getKey()).f1, entry.getValue().f1); - } - } + assertTrue(compatResult.isCompatibleWithReconfiguredSerializer()); + assertTrue(compatResult.getReconfiguredSerializer() instanceof PojoSerializer); + + PojoSerializer reconfiguredPojoSerializer = (PojoSerializer) compatResult.getReconfiguredSerializer(); + assertEquals(2, reconfiguredPojoSerializer.getSubclassSerializerCache().size()); + assertTrue(reconfiguredPojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class)); + assertTrue(reconfiguredPojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class)); + assertEquals(2, reconfiguredPojoSerializer.getRegisteredClasses().size()); + assertTrue(reconfiguredPojoSerializer.getRegisteredClasses().containsKey(SubTestUserClassA.class)); + assertTrue(reconfiguredPojoSerializer.getRegisteredClasses().containsKey(SubTestUserClassB.class)); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java index 3f6f09be7f4ead39f286f847d3a4c75268432e61..a02741e738a980ba8511396da1894780835b4242 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/typeserializerupgrade/PojoSerializerUpgradeTest.java @@ -23,8 +23,6 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.MapState; -import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReducingState; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.common.state.ValueState; @@ -225,74 +223,38 @@ public class PojoSerializerUpgradeTest extends TestLogger { } /** - * Adding fields to a POJO as keyed state should require a state migration. + * Adding fields to a POJO as keyed state should succeed. */ @Test public void testAdditionalFieldWithKeyedState() throws Exception { - try { - testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, true); - fail("Expected a state migration exception."); - } catch (Exception e) { - if (CommonTestUtils.containsCause(e, StateMigrationException.class)) { - // StateMigrationException expected - } else { - throw e; - } - } + testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, true); } /** - * Adding fields to a POJO as operator state should require a state migration. + * Adding fields to a POJO as operator state should succeed. */ @Test public void testAdditionalFieldWithOperatorState() throws Exception { - try { - testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, false); - fail("Expected a state migration exception."); - } catch (Exception e) { - if (CommonTestUtils.containsCause(e, StateMigrationException.class)) { - // StateMigrationException expected - } else { - throw e; - } - } + testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, false); } /** - * Removing fields from a POJO as keyed state should require a state migration. + * Removing fields from a POJO as keyed state should succeed. */ @Test public void testMissingFieldWithKeyedState() throws Exception { - try { - testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, true); - fail("Expected a state migration exception."); - } catch (Exception e) { - if (CommonTestUtils.containsCause(e, StateMigrationException.class)) { - // StateMigrationException expected - } else { - throw e; - } - } + testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, true); } /** - * Removing fields from a POJO as operator state should require a state migration. + * Removing fields from a POJO as operator state should succeed. */ @Test public void testMissingFieldWithOperatorState() throws Exception { - try { - testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, false); - fail("Expected a state migration exception."); - } catch (Exception e) { - if (CommonTestUtils.containsCause(e, StateMigrationException.class)) { - // StateMigrationException expected - } else { - throw e; - } - } + testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, false); } - public void testPojoSerializerUpgrade(String classSourceA, String classSourceB, boolean hasBField, boolean isKeyedState) throws Exception { + private void testPojoSerializerUpgrade(String classSourceA, String classSourceB, boolean hasBField, boolean isKeyedState) throws Exception { final Configuration taskConfiguration = new Configuration(); final ExecutionConfig executionConfig = new ExecutionConfig(); final KeySelector keySelector = new IdentityKeySelector<>(); @@ -424,7 +386,6 @@ public class PojoSerializerUpgradeTest extends TestLogger { // keyed states private transient ValueState keyedValueState; - private transient MapState keyedMapState; private transient ListState keyedListState; private transient ReducingState keyedReducingState; @@ -436,7 +397,7 @@ public class PojoSerializerUpgradeTest extends TestLogger { private transient Field fieldA; private transient Field fieldB; - public StatefulMapper(boolean keyed, boolean verify, boolean hasBField) { + StatefulMapper(boolean keyed, boolean verify, boolean hasBField) { this.keyed = keyed; this.verify = verify; this.hasBField = hasBField; @@ -456,9 +417,6 @@ public class PojoSerializerUpgradeTest extends TestLogger { if (keyed) { assertEquals(pojo, keyedValueState.value()); - assertTrue(keyedMapState.contains(pojo)); - assertEquals(pojo, keyedMapState.get(pojo)); - Iterator listIterator = keyedListState.get().iterator(); boolean elementFound = false; @@ -488,7 +446,6 @@ public class PojoSerializerUpgradeTest extends TestLogger { } else { if (keyed) { keyedValueState.update(pojo); - keyedMapState.put(pojo, pojo); keyedListState.add(pojo); keyedReducingState.add(pojo); } else { @@ -505,6 +462,7 @@ public class PojoSerializerUpgradeTest extends TestLogger { } + @SuppressWarnings("unchecked") @Override public void initializeState(FunctionInitializationContext context) throws Exception { pojoClass = getRuntimeContext().getUserCodeClassLoader().loadClass(POJO_NAME); @@ -520,8 +478,6 @@ public class PojoSerializerUpgradeTest extends TestLogger { if (keyed) { keyedValueState = context.getKeyedStateStore().getState( new ValueStateDescriptor<>("keyedValueState", (Class) pojoClass)); - keyedMapState = context.getKeyedStateStore().getMapState( - new MapStateDescriptor<>("keyedMapState", (Class) pojoClass, (Class) pojoClass)); keyedListState = context.getKeyedStateStore().getListState( new ListStateDescriptor<>("keyedListState", (Class) pojoClass));