未验证 提交 6d0bba21 编写于 作者: K klion26 提交者: Aljoscha Krettek

[FLINK-11334][core] Migrate EnumValueSerializer to use new serialization compatibility abstractions

This commit migrate EnumValueSerializer to use new serialization compatibilty abstractions
  * add a new class `ScalaEnumSerializerSnapshot`
  * return a `ScalaEnumSerializerConfigSnapshot ` with `ScalaEnumSerializerSnapshot` when calling `EnumValueSerializer#snapshotConfiguration`
  * add a migration test `EnumValueSerializerSnapshotMigrationTest` to test the compatibility
  * remove function `EnumValueSerializer#ensureCompatibility()`
上级 5701a20e
......@@ -20,7 +20,7 @@ package org.apache.flink.api.scala.typeutils
import java.io.IOException
import org.apache.flink.annotation.Internal
import org.apache.flink.api.common.typeutils.{CompatibilityResult, TypeSerializer, TypeSerializerConfigSnapshot}
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerConfigSnapshot, TypeSerializerSchemaCompatibility}
import org.apache.flink.api.common.typeutils.base.IntSerializer
import org.apache.flink.api.java.typeutils.runtime.{DataInputViewStream, DataOutputViewStream}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
......@@ -75,43 +75,8 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[
// Serializer configuration snapshotting & compatibility
// --------------------------------------------------------------------------------------------
override def snapshotConfiguration(): EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E] = {
new EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E](enum)
}
override def ensureCompatibility(
configSnapshot: TypeSerializerConfigSnapshot[_]): CompatibilityResult[E#Value] = {
configSnapshot match {
case enumSerializerConfigSnapshot: EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[_] =>
val enumClass = enum.getClass.asInstanceOf[Class[E]]
if (enumClass.equals(enumSerializerConfigSnapshot.getEnumClass)) {
val previousEnumConstants: List[(String, Int)] =
enumSerializerConfigSnapshot.getEnumConstants
for ((previousEnumConstant, idx) <- previousEnumConstants) {
val enumValue = try {
enum(idx)
} catch {
case _: NoSuchElementException =>
// couldn't find an enum value for the given index
return CompatibilityResult.requiresMigration()
}
if (!previousEnumConstant.equals(enumValue.toString)) {
// compatible only if new enum constants are only appended,
// and original constants must be in the exact same order
return CompatibilityResult.requiresMigration()
}
}
CompatibilityResult.compatible()
} else {
CompatibilityResult.requiresMigration()
}
case _ => CompatibilityResult.requiresMigration()
}
override def snapshotConfiguration(): ScalaEnumSerializerSnapshot[E] = {
new ScalaEnumSerializerSnapshot[E](enum)
}
}
......@@ -181,6 +146,12 @@ object EnumValueSerializer {
finally if (inViewWrapper != null) inViewWrapper.close()
}
override def resolveSchemaCompatibility(
newSerializer: TypeSerializer[E#Value]): TypeSerializerSchemaCompatibility[E#Value] = {
val serializerSnapshot = new ScalaEnumSerializerSnapshot(enumClass, enumConstants)
serializerSnapshot.resolveSchemaCompatibility(newSerializer)
}
override def getVersion: Int = ScalaEnumSerializerConfigSnapshot.VERSION
def getEnumClass: Class[E] = enumClass
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.scala.typeutils
import java.io.IOException
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSchemaCompatibility, TypeSerializerSnapshot}
import org.apache.flink.api.java.typeutils.runtime.{DataInputViewStream, DataOutputViewStream}
import org.apache.flink.core.memory.{DataInputView, DataOutputView}
import org.apache.flink.util.{InstantiationUtil, Preconditions}
import scala.collection.mutable.ListBuffer
class ScalaEnumSerializerSnapshot[E <: Enumeration]
extends TypeSerializerSnapshot[E#Value] {
var enumClass: Class[E] = _
var previousEnumConstants: List[(String, Int)] = _
def this(enum: E) = {
this()
this.enumClass = Preconditions.checkNotNull(enum).getClass.asInstanceOf[Class[E]]
this.previousEnumConstants = enum.values.toList.map(x => (x.toString, x.id))
}
def this(enumClass: Class[E], previousEnumConstants: List[(String, Int)]) = {
this()
this.enumClass = Preconditions.checkNotNull(enumClass)
this.previousEnumConstants = Preconditions.checkNotNull(previousEnumConstants)
}
override def getCurrentVersion: Int = ScalaEnumSerializerSnapshot.VERSION
override def writeSnapshot(out: DataOutputView): Unit = {
val outViewWrapper = new DataOutputViewStream(out)
try {
out.writeUTF(enumClass.getName)
out.writeInt(previousEnumConstants.length)
for ((name, idx) <- previousEnumConstants) {
out.writeUTF(name)
out.writeInt(idx)
}
} finally if (outViewWrapper != null) outViewWrapper.close()
}
override def readSnapshot(
readVersion: Int, in: DataInputView, userCodeClassLoader: ClassLoader): Unit = {
val inViewWrapper = new DataInputViewStream(in)
try {
if (readVersion == 1) {
enumClass = InstantiationUtil.deserializeObject(
inViewWrapper, userCodeClassLoader)
// read null from input stream
InstantiationUtil.deserializeObject(inViewWrapper, userCodeClassLoader)
previousEnumConstants = List()
} else if (readVersion >= 2) {
enumClass = Class.forName(
in.readUTF(), true, userCodeClassLoader).asInstanceOf[Class[E]]
val length = in.readInt()
val listBuffer = ListBuffer[(String, Int)]()
for (_ <- 0 until length) {
val name = in.readUTF()
val idx = in.readInt()
listBuffer += ((name, idx))
}
previousEnumConstants = listBuffer.toList
} else {
throw new IOException(
s"Cannot deserialize ${getClass.getSimpleName} with version $readVersion.")
}
} catch {
case e: ClassNotFoundException =>
throw new IOException("The requested enum class cannot be found in classpath.", e)
}
finally if (inViewWrapper != null) inViewWrapper.close()
}
override def restoreSerializer(): TypeSerializer[E#Value] = {
enumClass.newInstance().asInstanceOf[TypeSerializer[E#Value]]
}
override def resolveSchemaCompatibility(
newSerializer: TypeSerializer[E#Value]): TypeSerializerSchemaCompatibility[E#Value] = {
newSerializer match {
case newEnumSerializer: EnumValueSerializer[_] => {
if (enumClass.equals(newEnumSerializer.enum.getClass)) {
for ((previousEnumConstant, idx) <- previousEnumConstants) {
val enumValue = try {
newEnumSerializer.enum(idx)
} catch {
case _: NoSuchElementException =>
// couldn't find an enum value for the given index
return TypeSerializerSchemaCompatibility.incompatible()
}
if (!previousEnumConstant.equals(enumValue.toString)) {
// compatible only if new enum constants are only appended,
// and original constants must be in the exact same order
return TypeSerializerSchemaCompatibility.compatibleAfterMigration()
}
}
TypeSerializerSchemaCompatibility.compatibleAsIs()
} else {
TypeSerializerSchemaCompatibility.incompatible()
}
}
case _ => TypeSerializerSchemaCompatibility.incompatible()
}
}
}
object ScalaEnumSerializerSnapshot {
val VERSION = 3
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.scala.typeutils
import java.util
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSnapshotMigrationTestBase}
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase.{TestSpecification, TestSpecifications}
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.testutils.migration.MigrationVersion
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
/**
* Migration tests for the [[EnumValueSerializer]].
*/
@RunWith(classOf[Parameterized])
class EnumValueSerializerSnapshotMigrationTest(
spec: TestSpecification[Letters.Value])
extends TypeSerializerSnapshotMigrationTestBase[Letters.Value](spec) {}
object EnumValueSerializerSnapshotMigrationTest {
private val supplier =
new util.function.Supplier[EnumValueSerializer[Letters.type]] {
override def get(): EnumValueSerializer[Letters.type] =
new EnumValueSerializer(Letters)
}
@Parameterized.Parameters(name = "Test Specification = {0}")
def testSpecifications(): util.Collection[TestSpecification[_]] = {
val spec =
new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7)
spec.add(
"scala-enum-serializer",
classOf[EnumValueSerializer[Letters.Value]],
classOf[ScalaEnumSerializerSnapshot[Letters.Value]],
supplier
)
spec.get()
}
}
......@@ -35,7 +35,7 @@ class EnumValueSerializerTest extends TestLogger with JUnitSuiteLike {
val snapshot = enumSerializer.snapshotConfiguration()
assertFalse(enumSerializer.ensureCompatibility(snapshot).isRequiresMigration)
assertTrue(snapshot.resolveSchemaCompatibility(enumSerializer).isCompatibleAsIs)
}
}
......
......@@ -100,7 +100,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
*/
@Test
def checkRemovedField(): Unit = {
assertTrue(checkCompatibility(enumA, enumC).isIncompatible)
assertTrue(checkCompatibility(enumA, enumC).isCompatibleAfterMigration)
}
/**
......@@ -108,7 +108,7 @@ class EnumValueSerializerUpgradeTest extends TestLogger with JUnitSuiteLike {
*/
@Test
def checkDifferentFieldOrder(): Unit = {
assertTrue(checkCompatibility(enumA, enumD).isIncompatible)
assertTrue(checkCompatibility(enumA, enumD).isCompatibleAfterMigration)
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册