提交 20ebad04 编写于 作者: T Till Rohrmann

[FLINK-3088] [serialization] Fix copy method of TypeSerializer which use Kryo

Some TypeSerializer, WritableSerializer, ValueSerializer, and AvroSerializer, and
comparators, WritableComparator and ValueComparator, use Kryo to copy records.
In case where the Kryo serializer cannot copy the record, the copy method fails.
This is however not necessary, because one can copy the element by serializing
the record to a byte array and deserializing it from this array. This PR adds
this behaviour to the respective classes.

Adds KryoUtils tool with copy method to avoid code duplication

This closes #1415.

Adds comments to KryoUtils functions
上级 209ae6c9
......@@ -279,9 +279,16 @@ public final class InstantiationUtil {
}
InputViewDataInputStreamWrapper inputViewWrapper = new InputViewDataInputStreamWrapper(new DataInputStream(new ByteArrayInputStream(buf)));
return serializer.deserialize(inputViewWrapper);
}
public static <T> T deserializeFromByteArray(TypeSerializer<T> serializer, T reuse, byte[] buf) throws IOException {
if (buf == null) {
throw new NullPointerException("Byte array to deserialize from must not be null.");
}
T record = serializer.createInstance();
return serializer.deserialize(record, inputViewWrapper);
InputViewDataInputStreamWrapper inputViewWrapper = new InputViewDataInputStreamWrapper(new DataInputStream(new ByteArrayInputStream(buf)));
return serializer.deserialize(reuse, inputViewWrapper);
}
@SuppressWarnings("unchecked")
......
......@@ -18,13 +18,8 @@
package org.apache.flink.api.java.typeutils.runtime;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.base.Preconditions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectDatumReader;
......@@ -37,6 +32,7 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;
import com.esotericsoftware.kryo.Kryo;
import org.objenesis.strategy.StdInstantiatorStrategy;
/**
......@@ -96,28 +92,15 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
@Override
public T copy(T from) {
checkKryoInitialized();
return this.kryo.copy(from);
return KryoUtils.copy(from, kryo, this);
}
@Override
public T copy(T from, T reuse) {
checkKryoInitialized();
try {
return this.kryo.copy(from);
} catch(KryoException ke) {
// kryo was unable to copy it, so we do it through serialization:
ByteArrayOutputStream baout = new ByteArrayOutputStream();
Output output = new Output(baout);
kryo.writeObject(output, from);
output.close();
ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
Input input = new Input(bain);
return (T)kryo.readObject(input, from.getClass());
}
return KryoUtils.copy(from, reuse, kryo, this);
}
@Override
......@@ -174,6 +157,11 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = new Kryo();
Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
kryo.setInstantiatorStrategy(instantiatorStrategy);
// register Avro types.
this.kryo.register(GenericData.Array.class, new Serializers.SpecificInstanceCollectionSerializerForArrayList());
this.kryo.register(Utf8.class);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.util.InstantiationUtil;
import java.io.IOException;
/**
* Convenience methods for Kryo
*/
public class KryoUtils {
/**
* Tries to copy the given record from using the provided Kryo instance. If this fails, then
* the record from is copied by serializing it into a byte buffer and deserializing it from
* there.
*
* @param from Element to copy
* @param kryo Kryo instance to use
* @param serializer TypeSerializer which is used in case of a Kryo failure
* @param <T> Type of the element to be copied
* @return Copied element
*/
public static <T> T copy(T from, Kryo kryo, TypeSerializer<T> serializer) {
try {
return kryo.copy(from);
} catch (KryoException ke) {
// Kryo could not copy the object --> try to serialize/deserialize the object
try {
byte[] byteArray = InstantiationUtil.serializeToByteArray(serializer, from);
return InstantiationUtil.deserializeFromByteArray(serializer, byteArray);
} catch (IOException ioe) {
throw new RuntimeException("Could not copy object by serializing/deserializing" +
" it.", ioe);
}
}
}
/**
* Tries to copy the given record from using the provided Kryo instance. If this fails, then
* the record from is copied by serializing it into a byte buffer and deserializing it from
* there.
*
* @param from Element to copy
* @param reuse Reuse element for the deserialization
* @param kryo Kryo instance to use
* @param serializer TypeSerializer which is used in case of a Kryo failure
* @param <T> Type of the element to be copied
* @return Copied element
*/
public static <T> T copy(T from, T reuse, Kryo kryo, TypeSerializer<T> serializer) {
try {
return kryo.copy(from);
} catch (KryoException ke) {
// Kryo could not copy the object --> try to serialize/deserialize the object
try {
byte[] byteArray = InstantiationUtil.serializeToByteArray(serializer, from);
return InstantiationUtil.deserializeFromByteArray(serializer, reuse, byteArray);
} catch (IOException ioe) {
throw new RuntimeException("Could not copy object by serializing/deserializing" +
" it.", ioe);
}
}
}
}
......@@ -29,6 +29,7 @@ import org.apache.flink.types.Value;
import org.apache.flink.util.InstantiationUtil;
import com.esotericsoftware.kryo.Kryo;
import org.objenesis.strategy.StdInstantiatorStrategy;
/**
* Comparator for all Value types that extend Key
......@@ -63,7 +64,8 @@ public class ValueComparator<T extends Value & Comparable<T>> extends TypeCompar
@Override
public void setReference(T toCompare) {
checkKryoInitialized();
reference = this.kryo.copy(toCompare);
reference = KryoUtils.copy(toCompare, kryo, new ValueSerializer<T>(type));
}
@Override
......@@ -138,6 +140,11 @@ public class ValueComparator<T extends Value & Comparable<T>> extends TypeCompar
private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = new Kryo();
Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
kryo.setInstantiatorStrategy(instantiatorStrategy);
this.kryo.setAsmEnabled(true);
this.kryo.register(type);
}
......
......@@ -28,6 +28,7 @@ import org.apache.flink.types.Value;
import org.apache.flink.util.InstantiationUtil;
import com.esotericsoftware.kryo.Kryo;
import org.objenesis.strategy.StdInstantiatorStrategy;
/**
* Serializer for {@link Value} types. Uses the value's serialization methods, and uses
......@@ -71,13 +72,15 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
@Override
public T copy(T from) {
checkKryoInitialized();
return this.kryo.copy(from);
return KryoUtils.copy(from, kryo, this);
}
@Override
public T copy(T from, T reuse) {
checkKryoInitialized();
return this.kryo.copy(from);
return KryoUtils.copy(from, reuse, kryo, this);
}
@Override
......@@ -114,6 +117,11 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = new Kryo();
Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
kryo.setInstantiatorStrategy(instantiatorStrategy);
this.kryo.setAsmEnabled(true);
this.kryo.register(type);
}
......
......@@ -29,6 +29,7 @@ import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.io.Writable;
import com.esotericsoftware.kryo.Kryo;
import org.objenesis.strategy.StdInstantiatorStrategy;
public class WritableComparator<T extends Writable & Comparable<T>> extends TypeComparator<T> {
......@@ -60,7 +61,8 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
@Override
public void setReference(T toCompare) {
checkKryoInitialized();
reference = this.kryo.copy(toCompare);
reference = KryoUtils.copy(toCompare, kryo, new WritableSerializer<T>(type));
}
@Override
......@@ -163,6 +165,11 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = new Kryo();
Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
kryo.setInstantiatorStrategy(instantiatorStrategy);
this.kryo.setAsmEnabled(true);
this.kryo.register(type);
}
......
......@@ -18,7 +18,6 @@
package org.apache.flink.api.java.typeutils.runtime;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
......@@ -28,6 +27,9 @@ import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import com.esotericsoftware.kryo.Kryo;
import org.objenesis.strategy.StdInstantiatorStrategy;
import java.io.IOException;
public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
......@@ -51,17 +53,21 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
}
return InstantiationUtil.instantiate(typeClass);
}
@Override
public T copy(T from) {
checkKryoInitialized();
return this.kryo.copy(from);
return KryoUtils.copy(from, kryo, this);
}
@Override
public T copy(T from, T reuse) {
checkKryoInitialized();
return this.kryo.copy(from);
return KryoUtils.copy(from, reuse, kryo, this);
}
@Override
......@@ -113,6 +119,11 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = new Kryo();
Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
kryo.setInstantiatorStrategy(instantiatorStrategy);
this.kryo.setAsmEnabled(true);
this.kryo.register(typeClass);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.api.common.typeutils.ComparatorTestBase;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import java.util.UUID;
public class ValueComparatorUUIDTest extends ComparatorTestBase<ValueID> {
@Override
protected TypeComparator<ValueID> createComparator(boolean ascending) {
return new ValueComparator<>(ascending, ValueID.class);
}
@Override
protected TypeSerializer<ValueID> createSerializer() {
return new ValueSerializer<>(ValueID.class);
}
@Override
protected ValueID[] getSortedTestData() {
return new ValueID[] {
new ValueID(new UUID(0, 0)),
new ValueID(new UUID(1, 0)),
new ValueID(new UUID(1, 1))
};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.Value;
import java.io.IOException;
import java.util.UUID;
public class ValueID implements Value, Comparable<ValueID> {
private static final long serialVersionUID = -562791433077971752L;
private UUID id;
public ValueID() {
id = UUID.randomUUID();
}
public ValueID(UUID id) {
this.id = id;
}
@Override
public int compareTo(ValueID o) {
return id.compareTo(o.id);
}
@Override
public void write(DataOutputView out) throws IOException {
out.writeLong(id.getMostSignificantBits());
out.writeLong(id.getLeastSignificantBits());
}
@Override
public void read(DataInputView in) throws IOException {
id = new UUID(in.readLong(), in.readLong());
}
@Override
public boolean equals(Object obj) {
if (obj instanceof ValueID) {
ValueID other = (ValueID) obj;
return id.equals(other.id);
} else {
return false;
}
}
@Override
public int hashCode() {
return id.hashCode();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import java.util.UUID;
public class ValueSerializerUUIDTest extends SerializerTestBase<ValueID> {
@Override
protected TypeSerializer<ValueID> createSerializer() {
return new ValueSerializer<>(ValueID.class);
}
@Override
protected int getLength() {
return -1;
}
@Override
protected Class<ValueID> getTypeClass() {
return ValueID.class;
}
@Override
protected ValueID[] getTestData() {
return new ValueID[] {
new ValueID(new UUID(0, 0)),
new ValueID(new UUID(1, 0)),
new ValueID(new UUID(1, 1))
};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.api.common.typeutils.ComparatorTestBase;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import java.util.UUID;
public class WritableComparatorUUIDTest extends ComparatorTestBase<WritableID> {
@Override
protected TypeComparator<WritableID> createComparator(boolean ascending) {
return new WritableComparator<>(ascending, WritableID.class);
}
@Override
protected TypeSerializer<WritableID> createSerializer() {
return new WritableSerializer<>(WritableID.class);
}
@Override
protected WritableID[] getSortedTestData() {
return new WritableID[] {
new WritableID(new UUID(0, 0)),
new WritableID(new UUID(1, 0)),
new WritableID(new UUID(1, 1))
};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.UUID;
public class WritableID implements WritableComparable<WritableID> {
private UUID uuid;
public WritableID() {
this.uuid = UUID.randomUUID();
}
public WritableID(UUID uuid) {
this.uuid = uuid;
}
@Override
public int compareTo(WritableID o) {
return this.uuid.compareTo(o.uuid);
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(uuid.getMostSignificantBits());
dataOutput.writeLong(uuid.getLeastSignificantBits());
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.uuid = new UUID(dataInput.readLong(), dataInput.readLong());
}
@Override
public String toString() {
return uuid.toString();
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WritableID id = (WritableID) o;
return !(uuid != null ? !uuid.equals(id.uuid) : id.uuid != null);
}
@Override
public int hashCode() {
return uuid != null ? uuid.hashCode() : 0;
}
}
......@@ -47,5 +47,4 @@ public class WritableSerializerTest {
testInstance.testAll();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import java.util.UUID;
public class WritableSerializerUUIDTest extends SerializerTestBase<WritableID> {
@Override
protected TypeSerializer<WritableID> createSerializer() {
return new WritableSerializer<>(WritableID.class);
}
@Override
protected int getLength() {
return -1;
}
@Override
protected Class<WritableID> getTypeClass() {
return WritableID.class;
}
@Override
protected WritableID[] getTestData() {
return new WritableID[] {
new WritableID(new UUID(0, 0)),
new WritableID(new UUID(1, 0)),
new WritableID(new UUID(1, 1))
};
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册