[FLINK-11485] [tests] Adjust existing PojoSerializer upgrade tests

The following tests have been adjusted to new scope of functionality
related to upgrading the PojoSerializer: PojoSerializerTest and
PojoSerializerUpgradeTest.

In PojoSerializerTest, assertions that were supposed to check for
"isCompatibleWithReconfiguredSerializer" cases have been adjusted to
correctly do so. Also, a test where changing field order was tested was
removed, since the new PojoSerializerSnapshot always assumes that fields
have been sorted already by the type extractor.

In PojoSerializerUpgradeTest, tests which remove / add fields to POJO
classes were updated to no longer expect failures. They were expecting
failures because POJOs were not evolvable in the past.

The updates to these tests also provide coverage for the POJO schema
evolution feature.

This closes #7759.
上级 1850193b
......@@ -29,9 +29,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
......@@ -43,13 +41,9 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
......@@ -347,12 +341,14 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
@SuppressWarnings("unchecked")
TypeSerializerSchemaCompatibility<TestUserClass> compatResult =
pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
assertTrue(compatResult.isCompatibleAsIs());
assertTrue(compatResult.isCompatibleWithReconfiguredSerializer());
assertTrue(compatResult.getReconfiguredSerializer() instanceof PojoSerializer);
// reconfigure - check reconfiguration result and that registration ids remains the same
//assertEquals(ReconfigureResult.COMPATIBLE, pojoSerializer.reconfigure(pojoSerializerConfigSnapshot));
assertEquals(subClassATag, pojoSerializer.getRegisteredClasses().get(SubTestUserClassA.class).intValue());
assertEquals(subClassBTag, pojoSerializer.getRegisteredClasses().get(SubTestUserClassB.class).intValue());
PojoSerializer<TestUserClass> reconfiguredPojoSerializer = (PojoSerializer<TestUserClass>) compatResult.getReconfiguredSerializer();
assertEquals(subClassATag, reconfiguredPojoSerializer.getRegisteredClasses().get(SubTestUserClassA.class).intValue());
assertEquals(subClassBTag, reconfiguredPojoSerializer.getRegisteredClasses().get(SubTestUserClassB.class).intValue());
}
/**
......@@ -394,10 +390,13 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
@SuppressWarnings("unchecked")
TypeSerializerSchemaCompatibility<TestUserClass> compatResult =
pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
assertTrue(compatResult.isCompatibleAsIs());
assertEquals(2, pojoSerializer.getSubclassSerializerCache().size());
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
assertTrue(compatResult.isCompatibleWithReconfiguredSerializer());
assertTrue(compatResult.getReconfiguredSerializer() instanceof PojoSerializer);
PojoSerializer<TestUserClass> reconfiguredPojoSerializer = (PojoSerializer<TestUserClass>) compatResult.getReconfiguredSerializer();
assertEquals(2, reconfiguredPojoSerializer.getSubclassSerializerCache().size());
assertTrue(reconfiguredPojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
assertTrue(reconfiguredPojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
}
/**
......@@ -459,127 +458,15 @@ public class PojoSerializerTest extends SerializerTestBase<PojoSerializerTest.Te
@SuppressWarnings("unchecked")
TypeSerializerSchemaCompatibility<TestUserClass> compatResult =
pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
assertTrue(compatResult.isCompatibleAsIs());
assertEquals(2, pojoSerializer.getSubclassSerializerCache().size());
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
assertTrue(pojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
assertEquals(2, pojoSerializer.getRegisteredClasses().size());
assertTrue(pojoSerializer.getRegisteredClasses().containsKey(SubTestUserClassA.class));
assertTrue(pojoSerializer.getRegisteredClasses().containsKey(SubTestUserClassB.class));
}
/**
* Verifies that reconfiguration reorders the fields of the new Pojo serializer to remain the same.
*/
@Test
public void testReconfigureWithDifferentFieldOrder() throws Exception {
Field[] mockOriginalFieldOrder = {
TestUserClass.class.getField("dumm4"),
TestUserClass.class.getField("dumm3"),
TestUserClass.class.getField("nestedClass"),
TestUserClass.class.getField("dumm1"),
TestUserClass.class.getField("dumm2"),
TestUserClass.class.getField("dumm5"),
};
// creating this serializer just for generating config snapshots of the field serializers
PojoSerializer<TestUserClass> ser = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> mockOriginalFieldToSerializerConfigSnapshot =
new LinkedHashMap<>(mockOriginalFieldOrder.length);
mockOriginalFieldToSerializerConfigSnapshot.put(
mockOriginalFieldOrder[0].getName(),
new Tuple2<>(
ser.getFieldSerializers()[3],
ser.getFieldSerializers()[3].snapshotConfiguration()));
mockOriginalFieldToSerializerConfigSnapshot.put(
mockOriginalFieldOrder[1].getName(),
new Tuple2<>(
ser.getFieldSerializers()[2],
ser.getFieldSerializers()[2].snapshotConfiguration()));
mockOriginalFieldToSerializerConfigSnapshot.put(
mockOriginalFieldOrder[2].getName(),
new Tuple2<>(
ser.getFieldSerializers()[5],
ser.getFieldSerializers()[5].snapshotConfiguration()));
mockOriginalFieldToSerializerConfigSnapshot.put(
mockOriginalFieldOrder[3].getName(),
new Tuple2<>(
ser.getFieldSerializers()[0],
ser.getFieldSerializers()[0].snapshotConfiguration()));
mockOriginalFieldToSerializerConfigSnapshot.put(
mockOriginalFieldOrder[4].getName(),
new Tuple2<>(
ser.getFieldSerializers()[1],
ser.getFieldSerializers()[1].snapshotConfiguration()));
mockOriginalFieldToSerializerConfigSnapshot.put(
mockOriginalFieldOrder[5].getName(),
new Tuple2<>(
ser.getFieldSerializers()[4],
ser.getFieldSerializers()[4].snapshotConfiguration()));
PojoSerializer<TestUserClass> pojoSerializer = (PojoSerializer<TestUserClass>) type.createSerializer(new ExecutionConfig());
assertEquals(TestUserClass.class.getField("dumm1"), pojoSerializer.getFields()[0]);
assertEquals(TestUserClass.class.getField("dumm2"), pojoSerializer.getFields()[1]);
assertEquals(TestUserClass.class.getField("dumm3"), pojoSerializer.getFields()[2]);
assertEquals(TestUserClass.class.getField("dumm4"), pojoSerializer.getFields()[3]);
assertEquals(TestUserClass.class.getField("dumm5"), pojoSerializer.getFields()[4]);
assertEquals(TestUserClass.class.getField("nestedClass"), pojoSerializer.getFields()[5]);
PojoSerializer.PojoSerializerConfigSnapshot<TestUserClass> mockPreviousConfigSnapshot =
new PojoSerializer.PojoSerializerConfigSnapshot<>(
TestUserClass.class,
mockOriginalFieldToSerializerConfigSnapshot, // this mocks the previous field order
new LinkedHashMap<>(), // empty; irrelevant for this test
new HashMap<>()); // empty; irrelevant for this test
// reconfigure - check reconfiguration result and that fields are reordered to the previous order
TypeSerializerSchemaCompatibility<TestUserClass> compatResult =
mockPreviousConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
assertTrue(compatResult.isCompatibleAsIs());
int i = 0;
for (Field field : mockOriginalFieldOrder) {
assertEquals(field, pojoSerializer.getFields()[i]);
i++;
}
}
private static void verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(
PojoSerializer.PojoSerializerConfigSnapshot<?> original,
PojoSerializer.PojoSerializerConfigSnapshot<?> deserializedConfig) {
LinkedHashMap<String, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> originalFieldSerializersAndConfs =
original.getFieldToSerializerConfigSnapshot();
for (Map.Entry<String, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> entry
: deserializedConfig.getFieldToSerializerConfigSnapshot().entrySet()) {
Assert.assertTrue(entry.getValue().f0 instanceof UnloadableDummyTypeSerializer);
if (entry.getValue().f1 instanceof PojoSerializer.PojoSerializerConfigSnapshot) {
verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(
(PojoSerializer.PojoSerializerConfigSnapshot<?>) originalFieldSerializersAndConfs.get(entry.getKey()).f1,
(PojoSerializer.PojoSerializerConfigSnapshot<?>) entry.getValue().f1);
} else {
Assert.assertEquals(originalFieldSerializersAndConfs.get(entry.getKey()).f1, entry.getValue().f1);
}
}
LinkedHashMap<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> originalRegistrations =
original.getRegisteredSubclassesToSerializerConfigSnapshots();
for (Map.Entry<Class<?>, Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> entry
: deserializedConfig.getRegisteredSubclassesToSerializerConfigSnapshots().entrySet()) {
Assert.assertTrue(entry.getValue().f0 instanceof UnloadableDummyTypeSerializer);
if (entry.getValue().f1 instanceof PojoSerializer.PojoSerializerConfigSnapshot) {
verifyPojoSerializerConfigSnapshotWithSerializerSerializationFailure(
(PojoSerializer.PojoSerializerConfigSnapshot<?>) originalRegistrations.get(entry.getKey()).f1,
(PojoSerializer.PojoSerializerConfigSnapshot<?>) entry.getValue().f1);
} else {
Assert.assertEquals(originalRegistrations.get(entry.getKey()).f1, entry.getValue().f1);
}
}
assertTrue(compatResult.isCompatibleWithReconfiguredSerializer());
assertTrue(compatResult.getReconfiguredSerializer() instanceof PojoSerializer);
PojoSerializer<TestUserClass> reconfiguredPojoSerializer = (PojoSerializer<TestUserClass>) compatResult.getReconfiguredSerializer();
assertEquals(2, reconfiguredPojoSerializer.getSubclassSerializerCache().size());
assertTrue(reconfiguredPojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassA.class));
assertTrue(reconfiguredPojoSerializer.getSubclassSerializerCache().containsKey(SubTestUserClassB.class));
assertEquals(2, reconfiguredPojoSerializer.getRegisteredClasses().size());
assertTrue(reconfiguredPojoSerializer.getRegisteredClasses().containsKey(SubTestUserClassA.class));
assertTrue(reconfiguredPojoSerializer.getRegisteredClasses().containsKey(SubTestUserClassB.class));
}
}
......@@ -23,8 +23,6 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
......@@ -225,74 +223,38 @@ public class PojoSerializerUpgradeTest extends TestLogger {
}
/**
* Adding fields to a POJO as keyed state should require a state migration.
* Adding fields to a POJO as keyed state should succeed.
*/
@Test
public void testAdditionalFieldWithKeyedState() throws Exception {
try {
testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, true);
fail("Expected a state migration exception.");
} catch (Exception e) {
if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
// StateMigrationException expected
} else {
throw e;
}
}
testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, true);
}
/**
* Adding fields to a POJO as operator state should require a state migration.
* Adding fields to a POJO as operator state should succeed.
*/
@Test
public void testAdditionalFieldWithOperatorState() throws Exception {
try {
testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, false);
fail("Expected a state migration exception.");
} catch (Exception e) {
if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
// StateMigrationException expected
} else {
throw e;
}
}
testPojoSerializerUpgrade(SOURCE_A, SOURCE_D, true, false);
}
/**
* Removing fields from a POJO as keyed state should require a state migration.
* Removing fields from a POJO as keyed state should succeed.
*/
@Test
public void testMissingFieldWithKeyedState() throws Exception {
try {
testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, true);
fail("Expected a state migration exception.");
} catch (Exception e) {
if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
// StateMigrationException expected
} else {
throw e;
}
}
testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, true);
}
/**
* Removing fields from a POJO as operator state should require a state migration.
* Removing fields from a POJO as operator state should succeed.
*/
@Test
public void testMissingFieldWithOperatorState() throws Exception {
try {
testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, false);
fail("Expected a state migration exception.");
} catch (Exception e) {
if (CommonTestUtils.containsCause(e, StateMigrationException.class)) {
// StateMigrationException expected
} else {
throw e;
}
}
testPojoSerializerUpgrade(SOURCE_A, SOURCE_E, false, false);
}
public void testPojoSerializerUpgrade(String classSourceA, String classSourceB, boolean hasBField, boolean isKeyedState) throws Exception {
private void testPojoSerializerUpgrade(String classSourceA, String classSourceB, boolean hasBField, boolean isKeyedState) throws Exception {
final Configuration taskConfiguration = new Configuration();
final ExecutionConfig executionConfig = new ExecutionConfig();
final KeySelector<Long, Long> keySelector = new IdentityKeySelector<>();
......@@ -424,7 +386,6 @@ public class PojoSerializerUpgradeTest extends TestLogger {
// keyed states
private transient ValueState<Object> keyedValueState;
private transient MapState<Object, Object> keyedMapState;
private transient ListState<Object> keyedListState;
private transient ReducingState<Object> keyedReducingState;
......@@ -436,7 +397,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
private transient Field fieldA;
private transient Field fieldB;
public StatefulMapper(boolean keyed, boolean verify, boolean hasBField) {
StatefulMapper(boolean keyed, boolean verify, boolean hasBField) {
this.keyed = keyed;
this.verify = verify;
this.hasBField = hasBField;
......@@ -456,9 +417,6 @@ public class PojoSerializerUpgradeTest extends TestLogger {
if (keyed) {
assertEquals(pojo, keyedValueState.value());
assertTrue(keyedMapState.contains(pojo));
assertEquals(pojo, keyedMapState.get(pojo));
Iterator<Object> listIterator = keyedListState.get().iterator();
boolean elementFound = false;
......@@ -488,7 +446,6 @@ public class PojoSerializerUpgradeTest extends TestLogger {
} else {
if (keyed) {
keyedValueState.update(pojo);
keyedMapState.put(pojo, pojo);
keyedListState.add(pojo);
keyedReducingState.add(pojo);
} else {
......@@ -505,6 +462,7 @@ public class PojoSerializerUpgradeTest extends TestLogger {
}
@SuppressWarnings("unchecked")
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
pojoClass = getRuntimeContext().getUserCodeClassLoader().loadClass(POJO_NAME);
......@@ -520,8 +478,6 @@ public class PojoSerializerUpgradeTest extends TestLogger {
if (keyed) {
keyedValueState = context.getKeyedStateStore().getState(
new ValueStateDescriptor<>("keyedValueState", (Class<Object>) pojoClass));
keyedMapState = context.getKeyedStateStore().getMapState(
new MapStateDescriptor<>("keyedMapState", (Class<Object>) pojoClass, (Class<Object>) pojoClass));
keyedListState = context.getKeyedStateStore().getListState(
new ListStateDescriptor<>("keyedListState", (Class<Object>) pojoClass));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册