提交 76d4a75e 编写于 作者: S Stephan Ewen

[FLINK-1005] Extend TypeSerializer interface to handle non-mutable object...

[FLINK-1005] Extend TypeSerializer interface to handle non-mutable object deserialization more efficiently.
上级 ea4c8828
......@@ -62,6 +62,15 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
throw new RuntimeException("Cannot instantiate StreamRecord.", e);
}
}
@Override
public StreamRecord<T> copy(StreamRecord<T> from) {
StreamRecord<T> rec = new StreamRecord<T>();
rec.isTuple = from.isTuple;
rec.setId(from.getId().copy());
rec.setObject(typeSerializer.copy(from.getObject()));
return rec;
}
@Override
public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
......@@ -81,10 +90,18 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
value.getId().write(target);
typeSerializer.serialize(value.getObject(), target);
}
@Override
public StreamRecord<T> deserialize(DataInputView source) throws IOException {
StreamRecord<T> record = new StreamRecord<T>();
record.isTuple = this.isTuple;
record.getId().read(source);
record.setObject(typeSerializer.deserialize(source));
return record;
}
@Override
public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source)
throws IOException {
public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
reuse.getId().read(source);
reuse.setObject(typeSerializer.deserialize(reuse.getObject(), source));
return reuse;
......@@ -94,5 +111,4 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
public void copy(DataInputView source, DataOutputView target) throws IOException {
// Needs to be implemented
}
}
......@@ -73,10 +73,21 @@ public abstract class TypeSerializer<T> implements Serializable {
public abstract T createInstance();
/**
* Creates a copy from the given element, storing the copied result in the given reuse element if type is mutable.
* Creates a deep copy of the given element in a new element.
*
* @param from The element reuse be copied.
* @param reuse The element to be reused.
* @return A deep copy of the element.
*/
public abstract T copy(T from);
/**
* Creates a copy from the given element.
* The method makes an attempt to store the copy in the given reuse element, if the type is mutable.
* This is, however, not guaranteed.
*
* @param from The element to be copied.
* @param reuse The element to be reused. May or may not be used.
* @return A deep copy of the element.
*/
public abstract T copy(T from, T reuse);
......@@ -102,11 +113,23 @@ public abstract class TypeSerializer<T> implements Serializable {
*/
public abstract void serialize(T record, DataOutputView target) throws IOException;
/**
* De-serializes a record from the given source input view.
*
* @param source The input view from which to read the data.
* @result The deserialized element.
*
* @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
* input view, which may have an underlying I/O channel from which it reads.
*/
public abstract T deserialize(DataInputView source) throws IOException;
/**
* De-serializes a record from the given source input view into the given reuse record instance if mutable.
*
* @param reuse The record instance into which to de-serialize the data.
* @param source The input view from which to read the data.
* @result The deserialized element.
*
* @throws IOException Thrown, if the de-serialization encountered an I/O related error. Typically raised by the
* input view, which may have an underlying I/O channel from which it reads.
......@@ -126,19 +149,4 @@ public abstract class TypeSerializer<T> implements Serializable {
* @throws IOException Thrown if any of the two views raises an exception.
*/
public abstract void copy(DataInputView source, DataOutputView target) throws IOException;
// --------------------------------------------------------------------------------------------
// Default Utilities: Hash code and equals are pre-defined for singleton serializers, where
// all instances are equal
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return getClass().hashCode();
}
@Override
public boolean equals(Object obj) {
return obj != null && obj.getClass() == this.getClass();
}
}
......@@ -20,12 +20,10 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
public class BooleanSerializer extends TypeSerializer<Boolean> {
public final class BooleanSerializer extends TypeSerializerSingleton<Boolean> {
private static final long serialVersionUID = 1L;
......@@ -48,6 +46,11 @@ public class BooleanSerializer extends TypeSerializer<Boolean> {
return FALSE;
}
@Override
public Boolean copy(Boolean from) {
return from;
}
@Override
public Boolean copy(Boolean from, Boolean reuse) {
return from;
......@@ -63,6 +66,11 @@ public class BooleanSerializer extends TypeSerializer<Boolean> {
target.writeBoolean(record.booleanValue());
}
@Override
public Boolean deserialize(DataInputView source) throws IOException {
return Boolean.valueOf(source.readBoolean());
}
@Override
public Boolean deserialize(Boolean reuse, DataInputView source) throws IOException {
return Boolean.valueOf(source.readBoolean());
......
......@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.BooleanValue;
public class BooleanValueSerializer extends TypeSerializer<BooleanValue> {
public final class BooleanValueSerializer extends TypeSerializerSingleton<BooleanValue> {
private static final long serialVersionUID = 1L;
......@@ -48,6 +47,13 @@ public class BooleanValueSerializer extends TypeSerializer<BooleanValue> {
return new BooleanValue();
}
@Override
public BooleanValue copy(BooleanValue from) {
BooleanValue result = new BooleanValue();
result.setValue(from.getValue());
return result;
}
@Override
public BooleanValue copy(BooleanValue from, BooleanValue reuse) {
reuse.setValue(from.getValue());
......@@ -64,6 +70,11 @@ public class BooleanValueSerializer extends TypeSerializer<BooleanValue> {
record.write(target);
}
@Override
public BooleanValue deserialize(DataInputView source) throws IOException {
return deserialize(new BooleanValue(), source);
}
@Override
public BooleanValue deserialize(BooleanValue reuse, DataInputView source) throws IOException {
reuse.read(source);
......
......@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
public class ByteSerializer extends TypeSerializer<Byte> {
public final class ByteSerializer extends TypeSerializerSingleton<Byte> {
private static final long serialVersionUID = 1L;
......@@ -49,6 +48,11 @@ public class ByteSerializer extends TypeSerializer<Byte> {
return ZERO;
}
@Override
public Byte copy(Byte from) {
return from;
}
@Override
public Byte copy(Byte from, Byte reuse) {
return from;
......@@ -65,9 +69,14 @@ public class ByteSerializer extends TypeSerializer<Byte> {
}
@Override
public Byte deserialize(Byte reuse, DataInputView source) throws IOException {
public Byte deserialize(DataInputView source) throws IOException {
return Byte.valueOf(source.readByte());
}
@Override
public Byte deserialize(Byte reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
......
......@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.ByteValue;
public class ByteValueSerializer extends TypeSerializer<ByteValue> {
public final class ByteValueSerializer extends TypeSerializerSingleton<ByteValue> {
private static final long serialVersionUID = 1L;
......@@ -48,6 +47,11 @@ public class ByteValueSerializer extends TypeSerializer<ByteValue> {
return new ByteValue();
}
@Override
public ByteValue copy(ByteValue from) {
return copy(from, new ByteValue());
}
@Override
public ByteValue copy(ByteValue from, ByteValue reuse) {
reuse.setValue(from.getValue());
......@@ -64,6 +68,11 @@ public class ByteValueSerializer extends TypeSerializer<ByteValue> {
record.write(target);
}
@Override
public ByteValue deserialize(DataInputView source) throws IOException {
return deserialize(new ByteValue(), source);
}
@Override
public ByteValue deserialize(ByteValue reuse, DataInputView source) throws IOException {
reuse.read(source);
......
......@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
public class CharSerializer extends TypeSerializer<Character> {
public final class CharSerializer extends TypeSerializerSingleton<Character> {
private static final long serialVersionUID = 1L;
......@@ -49,6 +48,11 @@ public class CharSerializer extends TypeSerializer<Character> {
return ZERO;
}
@Override
public Character copy(Character from) {
return from;
}
@Override
public Character copy(Character from, Character reuse) {
return from;
......@@ -65,9 +69,14 @@ public class CharSerializer extends TypeSerializer<Character> {
}
@Override
public Character deserialize(Character reuse, DataInputView source) throws IOException {
public Character deserialize(DataInputView source) throws IOException {
return Character.valueOf(source.readChar());
}
@Override
public Character deserialize(Character reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
......
......@@ -20,13 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.CharValue;
public class CharValueSerializer extends TypeSerializer<CharValue> {
public class CharValueSerializer extends TypeSerializerSingleton<CharValue> {
private static final long serialVersionUID = 1L;
......@@ -47,6 +45,11 @@ public class CharValueSerializer extends TypeSerializer<CharValue> {
public CharValue createInstance() {
return new CharValue();
}
@Override
public CharValue copy(CharValue from) {
return copy(from, new CharValue());
}
@Override
public CharValue copy(CharValue from, CharValue reuse) {
......@@ -63,6 +66,11 @@ public class CharValueSerializer extends TypeSerializer<CharValue> {
public void serialize(CharValue record, DataOutputView target) throws IOException {
record.write(target);
}
@Override
public CharValue deserialize(DataInputView source) throws IOException {
return deserialize(new CharValue(), source);
}
@Override
public CharValue deserialize(CharValue reuse, DataInputView source) throws IOException {
......
......@@ -20,12 +20,10 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
public class DoubleSerializer extends TypeSerializer<Double> {
public final class DoubleSerializer extends TypeSerializerSingleton<Double> {
private static final long serialVersionUID = 1L;
......@@ -49,6 +47,11 @@ public class DoubleSerializer extends TypeSerializer<Double> {
return ZERO;
}
@Override
public Double copy(Double from) {
return from;
}
@Override
public Double copy(Double from, Double reuse) {
return from;
......@@ -65,9 +68,14 @@ public class DoubleSerializer extends TypeSerializer<Double> {
}
@Override
public Double deserialize(Double reuse, DataInputView source) throws IOException {
public Double deserialize(DataInputView source) throws IOException {
return Double.valueOf(source.readDouble());
}
@Override
public Double deserialize(Double reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
......
......@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.DoubleValue;
public class DoubleValueSerializer extends TypeSerializer<DoubleValue> {
public final class DoubleValueSerializer extends TypeSerializerSingleton<DoubleValue> {
private static final long serialVersionUID = 1L;
......@@ -48,6 +47,11 @@ public class DoubleValueSerializer extends TypeSerializer<DoubleValue> {
return new DoubleValue();
}
@Override
public DoubleValue copy(DoubleValue from) {
return copy(from, new DoubleValue());
}
@Override
public DoubleValue copy(DoubleValue from, DoubleValue reuse) {
reuse.setValue(from.getValue());
......@@ -64,6 +68,11 @@ public class DoubleValueSerializer extends TypeSerializer<DoubleValue> {
record.write(target);
}
@Override
public DoubleValue deserialize(DataInputView source) throws IOException {
return deserialize(new DoubleValue(), source);
}
@Override
public DoubleValue deserialize(DoubleValue reuse, DataInputView source) throws IOException {
reuse.read(source);
......
......@@ -20,12 +20,10 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
public class FloatSerializer extends TypeSerializer<Float> {
public final class FloatSerializer extends TypeSerializerSingleton<Float> {
private static final long serialVersionUID = 1L;
......@@ -49,6 +47,11 @@ public class FloatSerializer extends TypeSerializer<Float> {
return ZERO;
}
@Override
public Float copy(Float from) {
return from;
}
@Override
public Float copy(Float from, Float reuse) {
return from;
......@@ -65,9 +68,14 @@ public class FloatSerializer extends TypeSerializer<Float> {
}
@Override
public Float deserialize(Float reuse, DataInputView source) throws IOException {
public Float deserialize(DataInputView source) throws IOException {
return Float.valueOf(source.readFloat());
}
@Override
public Float deserialize(Float reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
......
......@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.FloatValue;
public class FloatValueSerializer extends TypeSerializer<FloatValue> {
public class FloatValueSerializer extends TypeSerializerSingleton<FloatValue> {
private static final long serialVersionUID = 1L;
......@@ -48,6 +47,11 @@ public class FloatValueSerializer extends TypeSerializer<FloatValue> {
return new FloatValue();
}
@Override
public FloatValue copy(FloatValue from) {
return copy(from, new FloatValue());
}
@Override
public FloatValue copy(FloatValue from, FloatValue reuse) {
reuse.setValue(from.getValue());
......@@ -64,6 +68,11 @@ public class FloatValueSerializer extends TypeSerializer<FloatValue> {
record.write(target);
}
@Override
public FloatValue deserialize(DataInputView source) throws IOException {
return deserialize(new FloatValue(), source);
}
@Override
public FloatValue deserialize(FloatValue reuse, DataInputView source) throws IOException {
reuse.read(source);
......@@ -72,6 +81,6 @@ public class FloatValueSerializer extends TypeSerializer<FloatValue> {
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.writeDouble(source.readFloat());
target.writeFloat(source.readFloat());
}
}
......@@ -27,9 +27,11 @@ import org.apache.flink.core.memory.DataOutputView;
/**
* A serializer for arrays of objects.
*
* @param <C> The component type
*/
public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
public final class GenericArraySerializer<C> extends TypeSerializer<C[]> {
private static final long serialVersionUID = 1L;
......@@ -40,7 +42,6 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
private final C[] EMPTY;
public GenericArraySerializer(Class<C> componentClass, TypeSerializer<C> componentSerializer) {
if (componentClass == null || componentSerializer == null) {
throw new NullPointerException();
......@@ -68,7 +69,7 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
}
@Override
public C[] copy(C[] from, C[] reuse) {
public C[] copy(C[] from) {
C[] copy = create(from.length);
for (int i = 0; i < copy.length; i++) {
......@@ -77,6 +78,11 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
return copy;
}
@Override
public C[] copy(C[] from, C[] reuse) {
return copy(from);
}
@Override
public int getLength() {
......@@ -97,6 +103,24 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
}
}
@Override
public C[] deserialize(DataInputView source) throws IOException {
int len = source.readInt();
C[] array = create(len);
for (int i = 0; i < len; i++) {
boolean isNonNull = source.readBoolean();
if (isNonNull) {
array[i] = componentSerializer.deserialize(source);
} else {
array[i] = null;
}
}
return array;
}
@Override
public C[] deserialize(C[] reuse, DataInputView source) throws IOException {
int len = source.readInt();
......@@ -108,7 +132,13 @@ public class GenericArraySerializer<C> extends TypeSerializer<C[]> {
for (int i = 0; i < len; i++) {
boolean isNonNull = source.readBoolean();
if (isNonNull) {
reuse[i] = componentSerializer.deserialize(componentSerializer.createInstance(), source);
C ri = reuse[i];
if (ri == null) {
ri = componentSerializer.deserialize(source);
} else {
ri = componentSerializer.deserialize(ri, source);
}
reuse[i] = ri;
} else {
reuse[i] = null;
}
......
......@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
public class IntSerializer extends TypeSerializer<Integer> {
public final class IntSerializer extends TypeSerializerSingleton<Integer> {
private static final long serialVersionUID = 1L;
......@@ -49,6 +48,11 @@ public class IntSerializer extends TypeSerializer<Integer> {
return ZERO;
}
@Override
public Integer copy(Integer from) {
return from;
}
@Override
public Integer copy(Integer from, Integer reuse) {
return from;
......@@ -65,9 +69,14 @@ public class IntSerializer extends TypeSerializer<Integer> {
}
@Override
public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
public Integer deserialize(DataInputView source) throws IOException {
return Integer.valueOf(source.readInt());
}
@Override
public Integer deserialize(Integer reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
......
......@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.IntValue;
public class IntValueSerializer extends TypeSerializer<IntValue> {
public final class IntValueSerializer extends TypeSerializerSingleton<IntValue> {
private static final long serialVersionUID = 1L;
......@@ -48,6 +47,11 @@ public class IntValueSerializer extends TypeSerializer<IntValue> {
return new IntValue();
}
@Override
public IntValue copy(IntValue from) {
return copy(from, new IntValue());
}
@Override
public IntValue copy(IntValue from, IntValue reuse) {
reuse.setValue(from.getValue());
......@@ -64,6 +68,11 @@ public class IntValueSerializer extends TypeSerializer<IntValue> {
record.write(target);
}
@Override
public IntValue deserialize(DataInputView source) throws IOException {
return deserialize(new IntValue(), source);
}
@Override
public IntValue deserialize(IntValue reuse, DataInputView source) throws IOException {
reuse.read(source);
......
......@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
public class LongSerializer extends TypeSerializer<Long> {
public final class LongSerializer extends TypeSerializerSingleton<Long> {
private static final long serialVersionUID = 1L;
......@@ -49,6 +48,11 @@ public class LongSerializer extends TypeSerializer<Long> {
return ZERO;
}
@Override
public Long copy(Long from) {
return from;
}
@Override
public Long copy(Long from, Long reuse) {
return from;
......@@ -65,9 +69,14 @@ public class LongSerializer extends TypeSerializer<Long> {
}
@Override
public Long deserialize(Long reuse, DataInputView source) throws IOException {
public Long deserialize(DataInputView source) throws IOException {
return Long.valueOf(source.readLong());
}
@Override
public Long deserialize(Long reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
......
......@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.LongValue;
public class LongValueSerializer extends TypeSerializer<LongValue> {
public final class LongValueSerializer extends TypeSerializerSingleton<LongValue> {
private static final long serialVersionUID = 1L;
......@@ -48,6 +47,11 @@ public class LongValueSerializer extends TypeSerializer<LongValue> {
return new LongValue();
}
@Override
public LongValue copy(LongValue from) {
return copy(from, new LongValue());
}
@Override
public LongValue copy(LongValue from, LongValue reuse) {
reuse.setValue(from.getValue());
......@@ -64,6 +68,11 @@ public class LongValueSerializer extends TypeSerializer<LongValue> {
record.write(target);
}
@Override
public LongValue deserialize(DataInputView source) throws IOException {
return deserialize(new LongValue(), source);
}
@Override
public LongValue deserialize(LongValue reuse, DataInputView source) throws IOException {
reuse.read(source);
......
......@@ -20,12 +20,11 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
public class ShortSerializer extends TypeSerializer<Short> {
public final class ShortSerializer extends TypeSerializerSingleton<Short> {
private static final long serialVersionUID = 1L;
......@@ -49,6 +48,11 @@ public class ShortSerializer extends TypeSerializer<Short> {
return ZERO;
}
@Override
public Short copy(Short from) {
return from;
}
@Override
public Short copy(Short from, Short reuse) {
return from;
......@@ -65,9 +69,14 @@ public class ShortSerializer extends TypeSerializer<Short> {
}
@Override
public Short deserialize(Short reuse, DataInputView source) throws IOException {
public Short deserialize(DataInputView source) throws IOException {
return Short.valueOf(source.readShort());
}
@Override
public Short deserialize(Short reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
......
......@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.ShortValue;
public class ShortValueSerializer extends TypeSerializer<ShortValue> {
public final class ShortValueSerializer extends TypeSerializerSingleton<ShortValue> {
private static final long serialVersionUID = 1L;
......@@ -48,6 +47,11 @@ public class ShortValueSerializer extends TypeSerializer<ShortValue> {
return new ShortValue();
}
@Override
public ShortValue copy(ShortValue from) {
return copy(from, new ShortValue());
}
@Override
public ShortValue copy(ShortValue from, ShortValue reuse) {
reuse.setValue(from.getValue());
......@@ -64,6 +68,11 @@ public class ShortValueSerializer extends TypeSerializer<ShortValue> {
record.write(target);
}
@Override
public ShortValue deserialize(DataInputView source) throws IOException {
return deserialize(new ShortValue(), source);
}
@Override
public ShortValue deserialize(ShortValue reuse, DataInputView source) throws IOException {
reuse.read(source);
......
......@@ -20,13 +20,12 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.StringValue;
public class StringSerializer extends TypeSerializer<String> {
public final class StringSerializer extends TypeSerializerSingleton<String> {
private static final long serialVersionUID = 1L;
......@@ -49,6 +48,11 @@ public class StringSerializer extends TypeSerializer<String> {
return EMPTY;
}
@Override
public String copy(String from) {
return from;
}
@Override
public String copy(String from, String reuse) {
return from;
......@@ -65,9 +69,14 @@ public class StringSerializer extends TypeSerializer<String> {
}
@Override
public String deserialize(String record, DataInputView source) throws IOException {
public String deserialize(DataInputView source) throws IOException {
return StringValue.readString(source);
}
@Override
public String deserialize(String record, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
......
......@@ -20,16 +20,17 @@ package org.apache.flink.api.common.typeutils.base;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.StringValue;
public class StringValueSerializer extends TypeSerializer<StringValue> {
public final class StringValueSerializer extends TypeSerializerSingleton<StringValue> {
private static final long serialVersionUID = 1L;
private static final int HIGH_BIT = 0x1 << 7;
public static final StringValueSerializer INSTANCE = new StringValueSerializer();
......@@ -48,6 +49,11 @@ public class StringValueSerializer extends TypeSerializer<StringValue> {
return new StringValue();
}
@Override
public StringValue copy(StringValue from) {
return copy(from, new StringValue());
}
@Override
public StringValue copy(StringValue from, StringValue reuse) {
reuse.setValue(from);
......@@ -64,6 +70,11 @@ public class StringValueSerializer extends TypeSerializer<StringValue> {
record.write(target);
}
@Override
public StringValue deserialize(DataInputView source) throws IOException {
return deserialize(new StringValue(), source);
}
@Override
public StringValue deserialize(StringValue reuse, DataInputView source) throws IOException {
reuse.read(source);
......@@ -72,6 +83,29 @@ public class StringValueSerializer extends TypeSerializer<StringValue> {
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
StringValue.copyString(source, target);
int len = source.readUnsignedByte();
target.writeByte(len);
if (len >= HIGH_BIT) {
int shift = 7;
int curr;
len = len & 0x7f;
while ((curr = source.readUnsignedByte()) >= HIGH_BIT) {
target.writeByte(curr);
len |= (curr & 0x7f) << shift;
shift += 7;
}
target.writeByte(curr);
len |= curr << shift;
}
for (int i = 0; i < len; i++) {
int c = source.readUnsignedByte();
target.writeByte(c);
while (c >= HIGH_BIT) {
c = source.readUnsignedByte();
target.writeByte(c);
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.api.common.typeutils.TypeSerializer;
public abstract class TypeSerializerSingleton<T> extends TypeSerializer<T>{
private static final long serialVersionUID = 8766687317209282373L;
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
return obj != null && obj.getClass() == this.getClass();
}
}
......@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
* A serializer for long arrays.
* A serializer for boolean arrays.
*/
public class BooleanPrimitiveArraySerializer extends TypeSerializer<boolean[]>{
public final class BooleanPrimitiveArraySerializer extends TypeSerializerSingleton<boolean[]>{
private static final long serialVersionUID = 1L;
......@@ -52,12 +52,17 @@ public class BooleanPrimitiveArraySerializer extends TypeSerializer<boolean[]>{
}
@Override
public boolean[] copy(boolean[] from, boolean[] reuse) {
public boolean[] copy(boolean[] from) {
boolean[] copy = new boolean[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
@Override
public boolean[] copy(boolean[] from, boolean[] reuse) {
return copy(from);
}
@Override
public int getLength() {
return -1;
......@@ -79,15 +84,20 @@ public class BooleanPrimitiveArraySerializer extends TypeSerializer<boolean[]>{
@Override
public boolean[] deserialize(boolean[] reuse, DataInputView source) throws IOException {
public boolean[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
reuse = new boolean[len];
boolean[] result = new boolean[len];
for (int i = 0; i < len; i++) {
reuse[i] = source.readBoolean();
result[i] = source.readBoolean();
}
return reuse;
return result;
}
@Override
public boolean[] deserialize(boolean[] reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
......
......@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
* A serializer for long arrays.
* A serializer for byte arrays.
*/
public class BytePrimitiveArraySerializer extends TypeSerializer<byte[]>{
public final class BytePrimitiveArraySerializer extends TypeSerializerSingleton<byte[]>{
private static final long serialVersionUID = 1L;
......@@ -51,11 +51,16 @@ public class BytePrimitiveArraySerializer extends TypeSerializer<byte[]>{
}
@Override
public byte[] copy(byte[] from, byte[] reuse) {
public byte[] copy(byte[] from) {
byte[] copy = new byte[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
@Override
public byte[] copy(byte[] from, byte[] reuse) {
return copy(from);
}
@Override
public int getLength() {
......@@ -74,13 +79,17 @@ public class BytePrimitiveArraySerializer extends TypeSerializer<byte[]>{
target.write(record);
}
@Override
public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
public byte[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
reuse = new byte[len];
source.readFully(reuse);
return reuse;
byte[] result = new byte[len];
source.readFully(result);
return result;
}
@Override
public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
......
......@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
* A serializer for long arrays.
* A serializer for char arrays.
*/
public class CharPrimitiveArraySerializer extends TypeSerializer<char[]>{
public final class CharPrimitiveArraySerializer extends TypeSerializerSingleton<char[]>{
private static final long serialVersionUID = 1L;
......@@ -52,11 +52,16 @@ public class CharPrimitiveArraySerializer extends TypeSerializer<char[]>{
}
@Override
public char[] copy(char[] from, char[] reuse) {
public char[] copy(char[] from) {
char[] copy = new char[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
@Override
public char[] copy(char[] from, char[] reuse) {
return copy(from);
}
@Override
public int getLength() {
......@@ -77,17 +82,21 @@ public class CharPrimitiveArraySerializer extends TypeSerializer<char[]>{
}
}
@Override
public char[] deserialize(char[] reuse, DataInputView source) throws IOException {
public char[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
reuse = new char[len];
char[] result = new char[len];
for (int i = 0; i < len; i++) {
reuse[i] = source.readChar();
result[i] = source.readChar();
}
return reuse;
return result;
}
@Override
public char[] deserialize(char[] reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
......
......@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
* A serializer for long arrays.
* A serializer for double arrays.
*/
public class DoublePrimitiveArraySerializer extends TypeSerializer<double[]>{
public final class DoublePrimitiveArraySerializer extends TypeSerializerSingleton<double[]>{
private static final long serialVersionUID = 1L;
......@@ -50,14 +50,19 @@ public class DoublePrimitiveArraySerializer extends TypeSerializer<double[]>{
public double[] createInstance() {
return EMPTY;
}
@Override
public double[] copy(double[] from, double[] reuse) {
public double[] copy(double[] from) {
double[] copy = new double[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
@Override
public double[] copy(double[] from, double[] reuse) {
return copy(from);
}
@Override
public int getLength() {
return -1;
......@@ -77,17 +82,21 @@ public class DoublePrimitiveArraySerializer extends TypeSerializer<double[]>{
}
}
@Override
public double[] deserialize(double[] reuse, DataInputView source) throws IOException {
public double[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
reuse = new double[len];
double[] result = new double[len];
for (int i = 0; i < len; i++) {
reuse[i] = source.readDouble();
result[i] = source.readDouble();
}
return reuse;
return result;
}
@Override
public double[] deserialize(double[] reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
......
......@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
* A serializer for long arrays.
* A serializer for float arrays.
*/
public class FloatPrimitiveArraySerializer extends TypeSerializer<float[]>{
public final class FloatPrimitiveArraySerializer extends TypeSerializerSingleton<float[]>{
private static final long serialVersionUID = 1L;
......@@ -52,11 +52,16 @@ public class FloatPrimitiveArraySerializer extends TypeSerializer<float[]>{
}
@Override
public float[] copy(float[] from, float[] reuse) {
public float[] copy(float[] from) {
float[] copy = new float[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
@Override
public float[] copy(float[] from, float[] reuse) {
return copy(from);
}
@Override
public int getLength() {
......@@ -77,17 +82,21 @@ public class FloatPrimitiveArraySerializer extends TypeSerializer<float[]>{
}
}
@Override
public float[] deserialize(float[] reuse, DataInputView source) throws IOException {
public float[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
reuse = new float[len];
float[] result = new float[len];
for (int i = 0; i < len; i++) {
reuse[i] = source.readFloat();
result[i] = source.readFloat();
}
return reuse;
return result;
}
@Override
public float[] deserialize(float[] reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
......
......@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
* A serializer for long arrays.
* A serializer for int arrays.
*/
public class IntPrimitiveArraySerializer extends TypeSerializer<int[]>{
public class IntPrimitiveArraySerializer extends TypeSerializerSingleton<int[]>{
private static final long serialVersionUID = 1L;
......@@ -52,11 +52,16 @@ public class IntPrimitiveArraySerializer extends TypeSerializer<int[]>{
}
@Override
public int[] copy(int[] from, int[] reuse) {
public int[] copy(int[] from) {
int[] copy = new int[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
@Override
public int[] copy(int[] from, int[] reuse) {
return copy(from);
}
@Override
public int getLength() {
......@@ -77,17 +82,21 @@ public class IntPrimitiveArraySerializer extends TypeSerializer<int[]>{
}
}
@Override
public int[] deserialize(int[] reuse, DataInputView source) throws IOException {
public int[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
reuse = new int[len];
int[] result = new int[len];
for (int i = 0; i < len; i++) {
reuse[i] = source.readInt();
result[i] = source.readInt();
}
return reuse;
return result;
}
@Override
public int[] deserialize(int[] reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
......
......@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
* A serializer for long arrays.
*/
public class LongPrimitiveArraySerializer extends TypeSerializer<long[]>{
public final class LongPrimitiveArraySerializer extends TypeSerializerSingleton<long[]>{
private static final long serialVersionUID = 1L;
......@@ -51,11 +51,16 @@ public class LongPrimitiveArraySerializer extends TypeSerializer<long[]>{
return EMPTY;
}
@Override
public long[] copy(long[] from) {
long[] result = new long[from.length];
System.arraycopy(from, 0, result, 0, from.length);
return result;
}
@Override
public long[] copy(long[] from, long[] reuse) {
reuse = new long[from.length];
System.arraycopy(from, 0, reuse, 0, from.length);
return reuse;
return copy(from);
}
@Override
......@@ -77,17 +82,21 @@ public class LongPrimitiveArraySerializer extends TypeSerializer<long[]>{
}
}
@Override
public long[] deserialize(long[] reuse, DataInputView source) throws IOException {
public long[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
reuse = new long[len];
long[] array = new long[len];
for (int i = 0; i < len; i++) {
reuse[i] = source.readLong();
array[i] = source.readLong();
}
return reuse;
return array;
}
@Override
public long[] deserialize(long[] reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
......
......@@ -20,14 +20,14 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
* A serializer for long arrays.
* A serializer for short arrays.
*/
public class ShortPrimitiveArraySerializer extends TypeSerializer<short[]>{
public final class ShortPrimitiveArraySerializer extends TypeSerializerSingleton<short[]>{
private static final long serialVersionUID = 1L;
......@@ -52,11 +52,16 @@ public class ShortPrimitiveArraySerializer extends TypeSerializer<short[]>{
}
@Override
public short[] copy(short[] from, short[] reuse) {
public short[] copy(short[] from) {
short[] copy = new short[from.length];
System.arraycopy(from, 0, copy, 0, from.length);
return copy;
}
@Override
public short[] copy(short[] from, short[] reuse) {
return copy(from);
}
@Override
public int getLength() {
......@@ -77,17 +82,21 @@ public class ShortPrimitiveArraySerializer extends TypeSerializer<short[]>{
}
}
@Override
public short[] deserialize(short[] reuse, DataInputView source) throws IOException {
public short[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
reuse = new short[len];
short[] array = new short[len];
for (int i = 0; i < len; i++) {
reuse[i] = source.readShort();
array[i] = source.readShort();
}
return reuse;
return array;
}
@Override
public short[] deserialize(short[] reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
......
......@@ -20,7 +20,7 @@ package org.apache.flink.api.common.typeutils.base.array;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.StringValue;
......@@ -29,7 +29,7 @@ import org.apache.flink.types.StringValue;
/**
* A serializer for String arrays. Specialized for efficiency.
*/
public class StringArraySerializer extends TypeSerializer<String[]>{
public final class StringArraySerializer extends TypeSerializerSingleton<String[]>{
private static final long serialVersionUID = 1L;
......@@ -53,11 +53,16 @@ public class StringArraySerializer extends TypeSerializer<String[]>{
return EMPTY;
}
@Override
public String[] copy(String[] from) {
String[] target = new String[from.length];
System.arraycopy(from, 0, target, 0, from.length);
return target;
}
@Override
public String[] copy(String[] from, String[] reuse) {
reuse = new String[from.length];
System.arraycopy(from, 0, reuse, 0, from.length);
return reuse;
return copy(from);
}
@Override
......@@ -65,7 +70,6 @@ public class StringArraySerializer extends TypeSerializer<String[]>{
return -1;
}
@Override
public void serialize(String[] record, DataOutputView target) throws IOException {
if (record == null) {
......@@ -79,17 +83,21 @@ public class StringArraySerializer extends TypeSerializer<String[]>{
}
}
@Override
public String[] deserialize(String[] reuse, DataInputView source) throws IOException {
public String[] deserialize(DataInputView source) throws IOException {
final int len = source.readInt();
reuse = new String[len];
String[] array = new String[len];
for (int i = 0; i < len; i++) {
reuse[i] = StringValue.readString(source);
array[i] = StringValue.readString(source);
}
return reuse;
return array;
}
@Override
public String[] deserialize(String[] reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
......
......@@ -66,13 +66,17 @@ public final class RecordSerializer extends TypeSerializer<Record> {
return new Record();
}
@Override
public Record copy(Record from) {
return from.createCopy();
}
@Override
public Record copy(Record from, Record reuse) {
from.copyTo(reuse);
return reuse;
}
@Override
public int getLength() {
return -1;
......@@ -85,6 +89,11 @@ public final class RecordSerializer extends TypeSerializer<Record> {
record.serialize(target);
}
@Override
public Record deserialize(DataInputView source) throws IOException {
return deserialize(new Record(), source);
}
@Override
public Record deserialize(Record target, DataInputView source) throws IOException {
target.deserialize(source);
......
......@@ -88,6 +88,24 @@ public abstract class SerializerTestBase<T> {
}
}
@Test
public void testCopy() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
for (T datum : testData) {
T copy = serializer.copy(datum);
deepEquals("Copied element is not equal to the original element.", datum, copy);
}
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Exception in test: " + e.getMessage());
}
}
@Test
public void testCopyIntoNewElements() {
try {
......@@ -184,7 +202,36 @@ public abstract class SerializerTestBase<T> {
}
@Test
public void testSerializeAsSequence() {
public void testSerializeAsSequenceNoReuse() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
TestOutputView out = new TestOutputView();
for (T value : testData) {
serializer.serialize(value, out);
}
TestInputView in = out.getInputView();
int num = 0;
while (in.available() > 0) {
T deserialized = serializer.deserialize(in);
deepEquals("Deserialized value if wrong.", testData[num], deserialized);
num++;
}
assertEquals("Wrong number of elements deserialized.", testData.length, num);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Exception in test: " + e.getMessage());
}
}
@Test
public void testSerializeAsSequenceReusingValues() {
try {
TypeSerializer<T> serializer = getSerializer();
T[] testData = getData();
......
......@@ -69,11 +69,13 @@ public class
public void testAll() {
testInstantiate();
testGetLength();
testCopy();
testCopyIntoNewElements();
testCopyIntoReusedElements();
testSerializeIndividually();
testSerializeIndividuallyReusingValues();
testSerializeAsSequence();
testSerializeAsSequenceNoReuse();
testSerializeAsSequenceReusingValues();
testSerializedCopyIndividually();
testSerializedCopyAsSequence();
testSerializabilityAndEquals();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import java.util.Random;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.BooleanValue;
/**
* A test for the {@link BooleanValueSerializer}.
*/
public class BooleanValueSerializerTest extends SerializerTestBase<BooleanValue> {
@Override
protected TypeSerializer<BooleanValue> createSerializer() {
return new BooleanValueSerializer();
}
@Override
protected int getLength() {
return 1;
}
@Override
protected Class<BooleanValue> getTypeClass() {
return BooleanValue.class;
}
@Override
protected BooleanValue[] getTestData() {
Random rnd = new Random(874597969123412341L);
return new BooleanValue[] {new BooleanValue(true), new BooleanValue(false),
new BooleanValue(rnd.nextBoolean()),
new BooleanValue(rnd.nextBoolean()),
new BooleanValue(rnd.nextBoolean())};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import java.util.Random;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.ByteValue;
/**
* A test for the {@link ByteValueSerializer}.
*/
public class ByteValueSerializerTest extends SerializerTestBase<ByteValue> {
@Override
protected TypeSerializer<ByteValue> createSerializer() {
return new ByteValueSerializer();
}
@Override
protected int getLength() {
return 1;
}
@Override
protected Class<ByteValue> getTypeClass() {
return ByteValue.class;
}
@Override
protected ByteValue[] getTestData() {
Random rnd = new Random(874597969123412341L);
byte byteArray[] = new byte[1];
rnd.nextBytes(byteArray);
return new ByteValue[] {new ByteValue((byte) 0), new ByteValue((byte) 1), new ByteValue((byte) -1),
new ByteValue(Byte.MAX_VALUE), new ByteValue(Byte.MIN_VALUE),
new ByteValue(byteArray[0]), new ByteValue((byte) -byteArray[0])};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import java.util.Random;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.CharValue;
/**
* A test for the {@link CharValueSerializer}.
*/
public class CharValueSerializerTest extends SerializerTestBase<CharValue> {
@Override
protected TypeSerializer<CharValue> createSerializer() {
return new CharValueSerializer();
}
@Override
protected int getLength() {
return 2;
}
@Override
protected Class<CharValue> getTypeClass() {
return CharValue.class;
}
@Override
protected CharValue[] getTestData() {
Random rnd = new Random(874597969123412341L);
int rndInt = rnd.nextInt((int) Character.MAX_VALUE);
return new CharValue[] {new CharValue('a'), new CharValue('@'), new CharValue('ä'),
new CharValue('1'), new CharValue((char) rndInt),
new CharValue(Character.MAX_VALUE), new CharValue(Character.MIN_VALUE)};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import java.util.Random;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.DoubleValue;
/**
* A test for the {@link DoubleValueSerializer}.
*/
public class DoubleValueSerializerTest extends SerializerTestBase<DoubleValue> {
@Override
protected TypeSerializer<DoubleValue> createSerializer() {
return new DoubleValueSerializer();
}
@Override
protected int getLength() {
return 8;
}
@Override
protected Class<DoubleValue> getTypeClass() {
return DoubleValue.class;
}
@Override
protected DoubleValue[] getTestData() {
Random rnd = new Random(874597969123412341L);
Double rndDouble = rnd.nextDouble() * Double.MAX_VALUE;
return new DoubleValue[] {new DoubleValue(0), new DoubleValue(1), new DoubleValue(-1),
new DoubleValue(Double.MAX_VALUE), new DoubleValue(Double.MIN_VALUE),
new DoubleValue(rndDouble), new DoubleValue(-rndDouble),
new DoubleValue(Double.NaN),
new DoubleValue(Double.NEGATIVE_INFINITY), new DoubleValue(Double.POSITIVE_INFINITY)};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import java.util.Random;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.FloatValue;
/**
* A test for the {@link FloatValueSerializer}.
*/
public class FloatValueSerializerTest extends SerializerTestBase<FloatValue> {
@Override
protected TypeSerializer<FloatValue> createSerializer() {
return new FloatValueSerializer();
}
@Override
protected int getLength() {
return 4;
}
@Override
protected Class<FloatValue> getTypeClass() {
return FloatValue.class;
}
@Override
protected FloatValue[] getTestData() {
Random rnd = new Random(874597969123412341L);
float rndFloat = rnd.nextFloat() * Float.MAX_VALUE;
return new FloatValue[] {new FloatValue(0), new FloatValue(1), new FloatValue(-1),
new FloatValue(Float.MAX_VALUE), new FloatValue(Float.MIN_VALUE),
new FloatValue(rndFloat), new FloatValue(-rndFloat),
new FloatValue(Float.NaN),
new FloatValue(Float.NEGATIVE_INFINITY), new FloatValue(Float.POSITIVE_INFINITY)};
}
}
......@@ -16,60 +16,41 @@
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
package org.apache.flink.runtime.operators.testutils.types;
import java.io.IOException;
import java.util.Random;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.IntValue;
public class IntValueSerializer extends TypeSerializer<IntValue> {
private static final long serialVersionUID = 1L;
@Override
public boolean isImmutableType() {
return false;
}
@Override
public boolean isStateful() {
return false;
}
@Override
public IntValue createInstance() {
return new IntValue();
}
/**
* A test for the {@link IntValueSerializer}.
*/
public class IntValueSerializerTest extends SerializerTestBase<IntValue> {
@Override
public IntValue copy(IntValue from, IntValue reuse) {
reuse.setValue(from.getValue());
return reuse;
protected TypeSerializer<IntValue> createSerializer() {
return new IntValueSerializer();
}
@Override
public int getLength() {
protected int getLength() {
return 4;
}
@Override
public void serialize(IntValue record, DataOutputView target) throws IOException {
target.writeInt(record.getValue());
protected Class<IntValue> getTypeClass() {
return IntValue.class;
}
@Override
public IntValue deserialize(IntValue reuse, DataInputView source) throws IOException {
reuse.setValue(source.readInt());
return reuse;
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.write(source, 4);
protected IntValue[] getTestData() {
Random rnd = new Random(874597969123412341L);
int rndInt = rnd.nextInt();
return new IntValue[] {new IntValue(0), new IntValue(1), new IntValue(-1),
new IntValue(Integer.MAX_VALUE), new IntValue(Integer.MIN_VALUE),
new IntValue(rndInt), new IntValue(-rndInt)};
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import java.util.Random;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.LongValue;
/**
* A test for the {@link LongValueSerializer}.
*/
public class LongValueSerializerTest extends SerializerTestBase<LongValue> {
@Override
protected TypeSerializer<LongValue> createSerializer() {
return new LongValueSerializer();
}
@Override
protected int getLength() {
return 8;
}
@Override
protected Class<LongValue> getTypeClass() {
return LongValue.class;
}
@Override
protected LongValue[] getTestData() {
Random rnd = new Random(874597969123412341L);
long rndLong = rnd.nextLong();
return new LongValue[] {new LongValue(0L), new LongValue(1L), new LongValue(-1L),
new LongValue(Long.MAX_VALUE), new LongValue(Long.MIN_VALUE),
new LongValue(rndLong), new LongValue(-rndLong)};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import java.util.Random;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.ShortValue;
/**
* A test for the {@link ShortValueSerializer}.
*/
public class ShortValueSerializerTest extends SerializerTestBase<ShortValue> {
@Override
protected TypeSerializer<ShortValue> createSerializer() {
return new ShortValueSerializer();
}
@Override
protected int getLength() {
return 2;
}
@Override
protected Class<ShortValue> getTypeClass() {
return ShortValue.class;
}
@Override
protected ShortValue[] getTestData() {
Random rnd = new Random(874597969123412341L);
int rndInt = rnd.nextInt(32767);
return new ShortValue[] {new ShortValue((short) 0), new ShortValue((short) 1), new ShortValue((short) -1),
new ShortValue((short) rndInt), new ShortValue((short) -rndInt),
new ShortValue((short) -32767), new ShortValue((short) 32768)};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.api.common.typeutils.SerializerTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.StringValue;
/**
* A test for the {@link StringValueSerializer}.
*/
public class StringValueSerializerTest extends SerializerTestBase<StringValue> {
@Override
protected TypeSerializer<StringValue> createSerializer() {
return new StringValueSerializer();
}
@Override
protected int getLength() {
return -1;
}
@Override
protected Class<StringValue> getTypeClass() {
return StringValue.class;
}
@Override
protected StringValue[] getTestData() {
return new StringValue[] {
new StringValue("a"),
new StringValue(""),
new StringValue("bcd"),
new StringValue("jbmbmner8 jhk hj \n \t üäßß@µ"),
new StringValue(""),
new StringValue("non-empty")};
}
}
......@@ -26,4 +26,8 @@
<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
<logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
<logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
<logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
</configuration>
\ No newline at end of file
......@@ -36,7 +36,7 @@ import com.esotericsoftware.kryo.Kryo;
*
* @param <T> The type serialized.
*/
public class AvroSerializer<T> extends TypeSerializer<T> {
public final class AvroSerializer<T> extends TypeSerializer<T> {
private static final long serialVersionUID = 1L;
......@@ -88,11 +88,16 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
return InstantiationUtil.instantiate(this.typeToInstantiate);
}
@Override
public T copy(T from) {
checkKryoInitialized();
return this.kryo.copy(from);
}
@Override
public T copy(T from, T reuse) {
checkKryoInitialized();
reuse = this.kryo.copy(from);
return reuse;
return this.kryo.copy(from);
}
@Override
......@@ -106,6 +111,13 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
this.encoder.setOut(target);
this.writer.write(value, this.encoder);
}
@Override
public T deserialize(DataInputView source) throws IOException {
checkAvroInitialized();
this.decoder.setIn(source);
return this.reader.read(null, this.decoder);
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
......@@ -146,4 +158,21 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
this.kryo.register(type);
}
}
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return 0x42fba55c + this.type.hashCode() + this.typeToInstantiate.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj.getClass() == AvroSerializer.class) {
AvroSerializer<?> other = (AvroSerializer<?>) obj;
return this.type == other.type && this.typeToInstantiate == other.typeToInstantiate;
} else {
return false;
}
}
}
......@@ -55,7 +55,12 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
public T createInstance() {
return InstantiationUtil.instantiate(this.valueClass);
}
@Override
public T copy(T from) {
return copy(from, createInstance());
}
@Override
public T copy(T from, T reuse) {
from.copyTo(reuse);
......@@ -73,6 +78,11 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
value.write(target);
}
@Override
public T deserialize(DataInputView source) throws IOException {
return deserialize(createInstance(), source);
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
reuse.read(source);
......@@ -92,4 +102,19 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
instance = createInstance();
}
}
@Override
public int hashCode() {
return this.valueClass.hashCode() + 9231;
}
@Override
public boolean equals(Object obj) {
if (obj.getClass() == CopyableValueSerializer.class) {
CopyableValueSerializer<?> other = (CopyableValueSerializer<?>) obj;
return this.valueClass == other.valueClass;
} else {
return false;
}
}
}
......@@ -21,6 +21,7 @@ package org.apache.flink.api.java.typeutils.runtime;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
......@@ -74,11 +75,16 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
return kryo.newInstance(typeToInstantiate);
}
@Override
public T copy(T from) {
checkKryoInitialized();
return kryo.copy(from);
}
@Override
public T copy(T from, T reuse) {
checkKryoInitialized();
reuse = kryo.copy(from);
return reuse;
return kryo.copy(from);
}
@Override
......@@ -100,15 +106,19 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
public T deserialize(DataInputView source) throws IOException {
checkKryoInitialized();
if (source != previousIn) {
DataInputViewStream inputStream = new DataInputViewStream(source);
input = new NoFetchingInput(inputStream);
previousIn = source;
}
reuse = kryo.readObject(input, typeToInstantiate);
return reuse;
return kryo.readObject(input, typeToInstantiate);
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
......@@ -121,6 +131,25 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
T tmp = deserialize(copyInstance, source);
serialize(tmp, target);
}
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return type.hashCode() + 31 * typeToInstantiate.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj != null && obj instanceof KryoSerializer) {
KryoSerializer<?> other = (KryoSerializer<?>) obj;
return other.type == this.type && other.typeToInstantiate == this.typeToInstantiate;
} else {
return false;
}
}
// --------------------------------------------------------------------------------------------
private final void checkKryoInitialized() {
if (this.kryo == null) {
......
......@@ -130,6 +130,28 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
}
}
@Override
public T copy(T from) {
T target;
try {
target = clazz.newInstance();
}
catch (Throwable t) {
throw new RuntimeException("Cannot instantiate class.", t);
}
try {
for (int i = 0; i < numFields; i++) {
Object copy = fieldSerializers[i].copy(fields[i].get(from));
fields[i].set(target, copy);
}
}
catch (IllegalAccessException e) {
throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields before.");
}
return target;
}
@Override
public T copy(T from, T reuse) {
try {
......@@ -164,6 +186,28 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
}
}
@Override
public T deserialize(DataInputView source) throws IOException {
T target;
try {
target = clazz.newInstance();
}
catch (Throwable t) {
throw new RuntimeException("Cannot instantiate class.", t);
}
try {
for (int i = 0; i < numFields; i++) {
Object field = fieldSerializers[i].deserialize(source);
fields[i].set(target, field);
}
} catch (IllegalAccessException e) {
throw new RuntimeException("Error during POJO copy, this should not happen since we check the fields" +
"before.");
}
return target;
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
try {
......
......@@ -31,7 +31,6 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
private static final long serialVersionUID = 1L;
@SuppressWarnings("unchecked")
public TupleSerializer(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
super(tupleClass, fieldSerializers);
}
......@@ -68,6 +67,11 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
}
}
@Override
public T copy(T from) {
return copy(from, instantiateRaw());
}
@Override
public T copy(T from, T reuse) {
for (int i = 0; i < arity; i++) {
......@@ -90,6 +94,16 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
}
}
@Override
public T deserialize(DataInputView source) throws IOException {
T tuple = instantiateRaw();
for (int i = 0; i < arity; i++) {
Object field = fieldSerializers[i].deserialize(source);
tuple.setField(field, i);
}
return tuple;
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
for (int i = 0; i < arity; i++) {
......@@ -98,4 +112,13 @@ public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<
}
return reuse;
}
private T instantiateRaw() {
try {
return tupleClass.newInstance();
}
catch (Exception e) {
throw new RuntimeException("Cannot instantiate tuple.", e);
}
}
}
......@@ -28,6 +28,8 @@ import java.util.Arrays;
public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
private static final long serialVersionUID = 1L;
protected final Class<T> tupleClass;
protected final TypeSerializer<Object>[] fieldSerializers;
......
......@@ -71,11 +71,16 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
return InstantiationUtil.instantiate(this.type);
}
@Override
public T copy(T from) {
checkKryoInitialized();
return this.kryo.copy(from);
}
@Override
public T copy(T from, T reuse) {
checkKryoInitialized();
reuse = this.kryo.copy(from);
return reuse;
return this.kryo.copy(from);
}
@Override
......@@ -88,6 +93,11 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
value.write(target);
}
@Override
public T deserialize(DataInputView source) throws IOException {
return deserialize(createInstance(), source);
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
reuse.read(source);
......@@ -111,4 +121,21 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
this.kryo.register(type);
}
}
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return this.type.hashCode() + 17;
}
@Override
public boolean equals(Object obj) {
if (obj.getClass() == ValueSerializer.class) {
ValueSerializer<?> other = (ValueSerializer<?>) obj;
return this.type == other.type;
} else {
return false;
}
}
}
......@@ -47,11 +47,16 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
return InstantiationUtil.instantiate(typeClass);
}
@Override
public T copy(T from) {
checkKryoInitialized();
return this.kryo.copy(from);
}
@Override
public T copy(T from, T reuse) {
checkKryoInitialized();
reuse = this.kryo.copy(from);
return reuse;
return this.kryo.copy(from);
}
@Override
......@@ -64,6 +69,11 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
record.write(target);
}
@Override
public T deserialize(DataInputView source) throws IOException {
return deserialize(createInstance(), source);
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
reuse.readFields(source);
......@@ -102,4 +112,20 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
this.kryo.register(typeClass);
}
}
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return this.typeClass.hashCode() + 177;
}
@Override
public boolean equals(Object obj) {
if (obj.getClass() == WritableSerializer.class) {
WritableSerializer<?> other = (WritableSerializer<?>) obj;
return this.typeClass == other.typeClass;
} else {
return false;
}
}
}
......@@ -16,24 +16,21 @@
* limitations under the License.
*/
package org.apache.flink.runtime.operators.resettable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.junit.Assert;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.types.IntValueSerializer;
import org.apache.flink.types.IntValue;
import org.junit.Assert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
......
......@@ -19,6 +19,7 @@
package org.apache.flink.runtime.operators.testutils.types;
import java.io.IOException;
import java.util.Arrays;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
......@@ -43,10 +44,15 @@ public class IntListSerializer extends TypeSerializer<IntList> {
return new IntList();
}
@Override
public IntList copy(IntList from) {
return new IntList(from.getKey(), Arrays.copyOf(from.getValue(), from.getValue().length));
}
@Override
public IntList copy(IntList from, IntList reuse) {
reuse.setKey(from.getKey());
reuse.setValue(from.getValue());
reuse.setValue(Arrays.copyOf(from.getValue(), from.getValue().length));
return reuse;
}
......@@ -73,6 +79,11 @@ public class IntListSerializer extends TypeSerializer<IntList> {
}
}
@Override
public IntList deserialize(DataInputView source) throws IOException {
return deserialize(new IntList(), source);
}
@Override
public IntList deserialize(IntList record, DataInputView source) throws IOException {
int key = source.readInt();
......
......@@ -46,6 +46,11 @@ public class IntPairSerializer extends TypeSerializer<IntPair> {
public IntPair createInstance() {
return new IntPair();
}
@Override
public IntPair copy(IntPair from) {
return new IntPair(from.getKey(), from.getValue());
}
@Override
public IntPair copy(IntPair from, IntPair reuse) {
......@@ -66,6 +71,11 @@ public class IntPairSerializer extends TypeSerializer<IntPair> {
target.writeInt(record.getValue());
}
@Override
public IntPair deserialize(DataInputView source) throws IOException {
return new IntPair(source.readInt(), source.readInt());
}
@Override
public IntPair deserialize(IntPair reuse, DataInputView source) throws IOException {
reuse.setKey(source.readInt());
......
......@@ -44,6 +44,10 @@ public class StringPairSerializer extends TypeSerializer<StringPair> {
return new StringPair();
}
@Override
public StringPair copy(StringPair from) {
return new StringPair(from.getKey(), from.getValue());
}
@Override
public StringPair copy(StringPair from, StringPair reuse) {
reuse.setKey(from.getKey());
......@@ -62,6 +66,11 @@ public class StringPairSerializer extends TypeSerializer<StringPair> {
StringValue.writeString(record.getValue(), target);
}
@Override
public StringPair deserialize(DataInputView source) throws IOException {
return new StringPair(StringValue.readString(source), StringValue.readString(source));
}
@Override
public StringPair deserialize(StringPair record, DataInputView source) throws IOException {
record.setKey(StringValue.readString(source));
......
......@@ -31,8 +31,11 @@ abstract class CaseClassSerializer[T <: Product](
scalaFieldSerializers: Array[TypeSerializer[_]])
extends TupleSerializerBase[T](clazz, scalaFieldSerializers) {
@transient var fields : Array[AnyRef] = _
def createInstance: T = {
val fields: Array[AnyRef] = new Array(arity)
initArray()
for (i <- 0 until arity) {
fields(i) = fieldSerializers(i).createInstance()
}
......@@ -40,7 +43,11 @@ abstract class CaseClassSerializer[T <: Product](
}
def copy(from: T, reuse: T): T = {
val fields: Array[AnyRef] = new Array(arity)
copy(from)
}
def copy(from: T): T = {
initArray()
for (i <- 0 until arity) {
fields(i) = from.productElement(i).asInstanceOf[AnyRef]
}
......@@ -55,11 +62,25 @@ abstract class CaseClassSerializer[T <: Product](
}
def deserialize(reuse: T, source: DataInputView): T = {
val fields: Array[AnyRef] = new Array(arity)
initArray()
for (i <- 0 until arity) {
val field = reuse.productElement(i).asInstanceOf[AnyRef]
fields(i) = fieldSerializers(i).deserialize(field, source)
}
createInstance(fields)
}
def deserialize(source: DataInputView): T = {
initArray()
for (i <- 0 until arity) {
fields(i) = fieldSerializers(i).deserialize(source)
}
createInstance(fields)
}
def initArray() = {
if (fields == null) {
fields = new Array[AnyRef](arity)
}
}
}
......@@ -20,11 +20,11 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
public final class VertexWithAdjacencyListSerializer extends TypeSerializer<VertexWithAdjacencyList> {
public final class VertexWithAdjacencyListSerializer extends TypeSerializerSingleton<VertexWithAdjacencyList> {
private static final long serialVersionUID = 1L;
......@@ -44,6 +44,14 @@ public final class VertexWithAdjacencyListSerializer extends TypeSerializer<Vert
return new VertexWithAdjacencyList();
}
@Override
public VertexWithAdjacencyList copy(VertexWithAdjacencyList from) {
VertexWithAdjacencyList copy = new VertexWithAdjacencyList(from.getVertexID(), new long[from.getNumTargets()]);
copy.setNumTargets(from.getNumTargets());
System.arraycopy(from.getTargets(), 0, copy.getTargets(), 0, from.getNumTargets());
return copy;
}
@Override
public VertexWithAdjacencyList copy(VertexWithAdjacencyList from, VertexWithAdjacencyList reuse) {
if (reuse.getTargets().length < from.getTargets().length) {
......@@ -74,6 +82,11 @@ public final class VertexWithAdjacencyListSerializer extends TypeSerializer<Vert
}
}
@Override
public VertexWithAdjacencyList deserialize(DataInputView source) throws IOException {
return deserialize(new VertexWithAdjacencyList(), source);
}
@Override
public VertexWithAdjacencyList deserialize(VertexWithAdjacencyList target, DataInputView source) throws IOException {
target.setVertexID(source.readLong());
......@@ -101,16 +114,4 @@ public final class VertexWithAdjacencyListSerializer extends TypeSerializer<Vert
target.writeInt(numTargets);
target.write(source, numTargets * 8);
}
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return 3;
}
@Override
public boolean equals(Object obj) {
return obj != null && obj.getClass() == VertexWithAdjacencyListSerializer.class;
}
}
......@@ -20,11 +20,11 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
public final class VertexWithRankAndDanglingSerializer extends TypeSerializer<VertexWithRankAndDangling> {
public final class VertexWithRankAndDanglingSerializer extends TypeSerializerSingleton<VertexWithRankAndDangling> {
private static final long serialVersionUID = 1L;
......@@ -44,6 +44,11 @@ public final class VertexWithRankAndDanglingSerializer extends TypeSerializer<Ve
return new VertexWithRankAndDangling();
}
@Override
public VertexWithRankAndDangling copy(VertexWithRankAndDangling from) {
return new VertexWithRankAndDangling(from.getVertexID(), from.getRank(), from.isDangling());
}
@Override
public VertexWithRankAndDangling copy(VertexWithRankAndDangling from, VertexWithRankAndDangling reuse) {
reuse.setVertexID(from.getVertexID());
......@@ -64,6 +69,11 @@ public final class VertexWithRankAndDanglingSerializer extends TypeSerializer<Ve
target.writeBoolean(record.isDangling());
}
@Override
public VertexWithRankAndDangling deserialize(DataInputView source) throws IOException {
return new VertexWithRankAndDangling(source.readLong(), source.readDouble(), source.readBoolean());
}
@Override
public VertexWithRankAndDangling deserialize(VertexWithRankAndDangling target, DataInputView source) throws IOException {
target.setVertexID(source.readLong());
......@@ -76,16 +86,4 @@ public final class VertexWithRankAndDanglingSerializer extends TypeSerializer<Ve
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.write(source, 17);
}
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return 2;
}
@Override
public boolean equals(Object obj) {
return obj != null && obj.getClass() == VertexWithRankAndDanglingSerializer.class;
}
}
......@@ -20,11 +20,11 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
public final class VertexWithRankSerializer extends TypeSerializer<VertexWithRank> {
public final class VertexWithRankSerializer extends TypeSerializerSingleton<VertexWithRank> {
private static final long serialVersionUID = 1L;
......@@ -43,6 +43,11 @@ public final class VertexWithRankSerializer extends TypeSerializer<VertexWithRan
public VertexWithRank createInstance() {
return new VertexWithRank();
}
@Override
public VertexWithRank copy(VertexWithRank from) {
return new VertexWithRank(from.getVertexID(), from.getRank());
}
@Override
public VertexWithRank copy(VertexWithRank from, VertexWithRank reuse) {
......@@ -62,6 +67,11 @@ public final class VertexWithRankSerializer extends TypeSerializer<VertexWithRan
target.writeDouble(record.getRank());
}
@Override
public VertexWithRank deserialize(DataInputView source) throws IOException {
return new VertexWithRank(source.readLong(), source.readDouble());
}
@Override
public VertexWithRank deserialize(VertexWithRank target, DataInputView source) throws IOException {
target.setVertexID(source.readLong());
......@@ -73,16 +83,4 @@ public final class VertexWithRankSerializer extends TypeSerializer<VertexWithRan
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.write(source, 16);
}
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return 1;
}
@Override
public boolean equals(Object obj) {
return obj != null && obj.getClass() == VertexWithRankSerializer.class;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册