提交 57b8e66a 编写于 作者: A Aljoscha Krettek

Refactor TupleTypeInfo and add GenericPairComparator

Now we have TupleTypeInfoBase, TupleSerializerBase, and TupleComparatorBase. They
are now super classes of TupleTypeInfo and the others.

Also rename compare on DataInputView to compareSerialized because Scala
cannot distinguish between the to compare methods for some reason.

This change is necessary for allowing the Scala API to reuse most of the
functionality.

The GenericPairComparator uses the new extractKeys method of
TypeComparator to compare values of any type. This replaces
TuplePairComparator and some other special-case pair comparators. This
is preparatory work for enabling support for Scala Tuples and POJO
comparators.
上级 7f946cee
......@@ -177,7 +177,7 @@ public abstract class TypeComparator<T> implements Serializable {
*
* @see java.util.Comparator#compare(Object, Object)
*/
public abstract int compare(DataInputView firstSource, DataInputView secondSource) throws IOException;
public abstract int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException;
// --------------------------------------------------------------------------------------------
......@@ -286,7 +286,21 @@ public abstract class TypeComparator<T> implements Serializable {
public abstract TypeComparator<T> duplicate();
// --------------------------------------------------------------------------------------------
/**
* Extracts the key fields from a record. This is for use by the PairComparator to provide
* interoperability between different record types.
*/
public abstract Object[] extractKeys(T record);
/**
* Get the field comparators. This is used together with {@link #extractKeys(Object)} to provide
* interoperability between different record types.
*/
public abstract TypeComparator[] getComparators();
// --------------------------------------------------------------------------------------------
@SuppressWarnings("rawtypes")
public int compareAgainstReference(Comparable[] keys) {
throw new UnsupportedOperationException("Workaround hack.");
......
......@@ -32,6 +32,12 @@ public abstract class BasicTypeComparator<T extends Comparable<T>> extends TypeC
private transient T reference;
protected final boolean ascendingComparison;
// This is used in extractKeys, so that we don't create a new array for every call.
private final Comparable[] extractedKey = new Comparable[1];
// For use by getComparators
private final TypeComparator[] comparators = new TypeComparator[] {this};
protected BasicTypeComparator(boolean ascending) {
......@@ -80,6 +86,17 @@ public abstract class BasicTypeComparator<T extends Comparable<T>> extends TypeC
throw new UnsupportedOperationException();
}
@Override
public Object[] extractKeys(T record) {
extractedKey[0] = record;
return extractedKey;
}
@Override
public TypeComparator[] getComparators() {
return comparators;
}
@Override
public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
throw new UnsupportedOperationException();
......
......@@ -35,7 +35,7 @@ public final class BooleanComparator extends BasicTypeComparator<Boolean> {
}
@Override
public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
final int fs = firstSource.readBoolean() ? 1 : 0;
final int ss = secondSource.readBoolean() ? 1 : 0;
int comp = fs - ss;
......
......@@ -35,7 +35,7 @@ public final class ByteComparator extends BasicTypeComparator<Byte> {
}
@Override
public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
byte b1 = firstSource.readByte();
byte b2 = secondSource.readByte();
int comp = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1));
......
......@@ -35,7 +35,7 @@ public final class CharComparator extends BasicTypeComparator<Character> {
}
@Override
public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
char c1 = firstSource.readChar();
char c2 = secondSource.readChar();
int comp = (c1 < c2 ? -1 : (c1 == c2 ? 0 : 1));
......
......@@ -35,7 +35,7 @@ public final class DoubleComparator extends BasicTypeComparator<Double> {
}
@Override
public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
double l1 = firstSource.readDouble();
double l2 = secondSource.readDouble();
int comp = (l1 < l2 ? -1 : (l1 > l2 ? 1 : 0));
......
......@@ -35,7 +35,7 @@ public final class FloatComparator extends BasicTypeComparator<Float> {
}
@Override
public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
float l1 = firstSource.readFloat();
float l2 = secondSource.readFloat();
int comp = (l1 < l2 ? -1 : (l1 > l2 ? 1 : 0));
......
......@@ -35,7 +35,7 @@ public final class IntComparator extends BasicTypeComparator<Integer> {
}
@Override
public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
int i1 = firstSource.readInt();
int i2 = secondSource.readInt();
int comp = (i1 < i2 ? -1 : (i1 == i2 ? 0 : 1));
......
......@@ -35,7 +35,7 @@ public final class LongComparator extends BasicTypeComparator<Long> {
}
@Override
public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
long l1 = firstSource.readLong();
long l2 = secondSource.readLong();
int comp = (l1 < l2 ? -1 : (l1 == l2 ? 0 : 1));
......
......@@ -35,7 +35,7 @@ public final class ShortComparator extends BasicTypeComparator<Short> {
}
@Override
public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
short s1 = firstSource.readShort();
short s2 = secondSource.readShort();
int comp = (s1 < s2 ? -1 : (s1 == s2 ? 0 : 1));
......
......@@ -42,7 +42,7 @@ public final class StringComparator extends BasicTypeComparator<String> {
}
@Override
public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
String s1 = StringValue.readString(firstSource);
String s2 = StringValue.readString(secondSource);
int comp = s1.compareTo(s2);
......
......@@ -26,8 +26,6 @@ import java.io.IOException;
import static org.junit.Assert.*;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
......@@ -108,7 +106,7 @@ public abstract class ComparatorTestBase<T> {
writeSortedData(d, out1);
in1 = out1.getInputView();
assertTrue(comparator.compare(in1, in2) == 0);
assertTrue(comparator.compareSerialized(in1, in2) == 0);
}
} catch (Exception e) {
System.err.println(e.getMessage());
......@@ -173,16 +171,16 @@ public abstract class ComparatorTestBase<T> {
in2 = out2.getInputView();
if (greater && ascending) {
assertTrue(comparator.compare(in1, in2) < 0);
assertTrue(comparator.compareSerialized(in1, in2) < 0);
}
if (greater && !ascending) {
assertTrue(comparator.compare(in1, in2) > 0);
assertTrue(comparator.compareSerialized(in1, in2) > 0);
}
if (!greater && ascending) {
assertTrue(comparator.compare(in2, in1) > 0);
assertTrue(comparator.compareSerialized(in2, in1) > 0);
}
if (!greater && !ascending) {
assertTrue(comparator.compare(in2, in1) < 0);
assertTrue(comparator.compareSerialized(in2, in1) < 0);
}
}
}
......
......@@ -23,81 +23,29 @@ import java.util.Arrays;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
import org.apache.flink.api.java.typeutils.runtime.TupleLeadingFieldComparator;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.types.TypeInformation;
//CHECKSTYLE.OFF: AvoidStarImport - Needed for TupleGenerator
import org.apache.flink.api.java.tuple.*;
//CHECKSTYLE.ON: AvoidStarImport
public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implements CompositeType<T> {
private final TypeInformation<?>[] types;
private final Class<T> tupleType;
public TupleTypeInfo(TypeInformation<?>... types) {
if (types == null || types.length == 0 || types.length > Tuple.MAX_ARITY) {
throw new IllegalArgumentException();
}
@SuppressWarnings("unchecked")
Class<T> typeClass = (Class<T>) CLASSES[types.length - 1];
this.types = types;
this.tupleType = typeClass;
public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
@SuppressWarnings("unchecked")
public TupleTypeInfo(TypeInformation<?>... types) {
this((Class<T>) CLASSES[types.length - 1], types);
}
public TupleTypeInfo(Class<T> tupleType, TypeInformation<?>... types) {
super(tupleType, types);
if (types == null || types.length == 0 || types.length > Tuple.MAX_ARITY) {
throw new IllegalArgumentException();
}
this.tupleType = tupleType;
this.types = types;
}
@Override
public boolean isBasicType() {
return false;
}
@Override
public boolean isTupleType() {
return true;
}
@Override
public int getArity() {
return types.length;
}
@Override
public Class<T> getTypeClass() {
return tupleType;
}
public <X> TypeInformation<X> getTypeAt(int pos) {
if (pos < 0 || pos >= this.types.length) {
throw new IndexOutOfBoundsException();
}
@SuppressWarnings("unchecked")
TypeInformation<X> typed = (TypeInformation<X>) this.types[pos];
return typed;
}
@Override
public boolean isKeyType() {
return this.isValidKeyType(this);
}
@Override
public TupleSerializer<T> createSerializer() {
TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[getArity()];
......@@ -118,14 +66,7 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implement
{
throw new IllegalArgumentException();
}
// special case for tuples where field zero is the key field
if (logicalKeyFields.length == 1 && logicalKeyFields[0] == 0 && !types[0].isTupleType()) {
return createLeadingFieldComparator(orders[0], types[0]);
}
// --- general case ---
int maxKey = -1;
for (int key : logicalKeyFields){
maxKey = Math.max(key, maxKey);
......@@ -169,39 +110,12 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implement
}
// --------------------------------------------------------------------------------------------
@Override
public boolean equals(Object obj) {
if (obj instanceof TupleTypeInfo) {
@SuppressWarnings("unchecked")
TupleTypeInfo<T> other = (TupleTypeInfo<T>) obj;
return ((this.tupleType == null && other.tupleType == null) || this.tupleType.equals(other.tupleType)) &&
Arrays.deepEquals(this.types, other.types);
} else {
return false;
}
}
@Override
public int hashCode() {
return this.types.hashCode() ^ Arrays.deepHashCode(this.types);
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder("Tuple");
bld.append(types.length).append('<');
bld.append(types[0]);
for (int i = 1; i < types.length; i++) {
bld.append(", ").append(types[i]);
}
bld.append('>');
return bld.toString();
return "Java " + super.toString();
}
// --------------------------------------------------------------------------------------------
public static <X extends Tuple> TupleTypeInfo<X> getBasicTupleTypeInfo(Class<?>... basicTypes) {
......@@ -227,23 +141,7 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implement
TupleTypeInfo<X> tupleInfo = (TupleTypeInfo<X>) new TupleTypeInfo<Tuple>(infos);
return tupleInfo;
}
private boolean isValidKeyType(TypeInformation<?> typeInfo) {
if(typeInfo instanceof TupleTypeInfo) {
TupleTypeInfo<?> tupleType = ((TupleTypeInfo<?>)typeInfo);
for(int i=0;i<tupleType.getArity();i++) {
if (!isValidKeyType(tupleType.getTypeAt(i))) {
return false;
}
}
return true;
} else if(typeInfo.isKeyType()) {
return true;
} else {
return false;
}
}
// --------------------------------------------------------------------------------------------
// The following lines are generated.
// --------------------------------------------------------------------------------------------
......@@ -254,15 +152,4 @@ public class TupleTypeInfo<T extends Tuple> extends TypeInformation<T> implement
Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, Tuple25.class
};
// END_OF_TUPLE_DEPENDENT_CODE
private static final <T extends Tuple, K> TypeComparator<T> createLeadingFieldComparator(boolean ascending, TypeInformation<?> info) {
if (!(info.isKeyType() && info instanceof AtomicType)) {
throw new IllegalArgumentException("The field at position 0 (" + info + ") is no atomic key type.");
}
@SuppressWarnings("unchecked")
AtomicType<K> typedInfo = (AtomicType<K>) info;
return new TupleLeadingFieldComparator<T, K>(typedInfo.createComparator(ascending));
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils;
import java.util.Arrays;
import org.apache.flink.types.TypeInformation;
public abstract class TupleTypeInfoBase<T> extends TypeInformation<T> implements CompositeType<T> {
protected final TypeInformation<?>[] types;
protected final Class<T> tupleType;
public TupleTypeInfoBase(Class<T> tupleType, TypeInformation<?>... types) {
this.tupleType = tupleType;
this.types = types;
}
@Override
public boolean isBasicType() {
return false;
}
@Override
public boolean isTupleType() {
return true;
}
@Override
public int getArity() {
return types.length;
}
@Override
public Class<T> getTypeClass() {
return tupleType;
}
public <X> TypeInformation<X> getTypeAt(int pos) {
if (pos < 0 || pos >= this.types.length) {
throw new IndexOutOfBoundsException();
}
@SuppressWarnings("unchecked")
TypeInformation<X> typed = (TypeInformation<X>) this.types[pos];
return typed;
}
@Override
public boolean isKeyType() {
return isValidKeyType(this);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof TupleTypeInfoBase) {
@SuppressWarnings("unchecked")
TupleTypeInfoBase<T> other = (TupleTypeInfoBase<T>) obj;
return ((this.tupleType == null && other.tupleType == null) || this.tupleType.equals(other.tupleType)) &&
Arrays.deepEquals(this.types, other.types);
} else {
return false;
}
}
@Override
public int hashCode() {
return this.types.hashCode() ^ Arrays.deepHashCode(this.types);
}
private boolean isValidKeyType(TypeInformation<?> typeInfo) {
if(typeInfo instanceof TupleTypeInfoBase) {
TupleTypeInfoBase<?> tupleType = ((TupleTypeInfoBase<?>)typeInfo);
for(int i=0;i<tupleType.getArity();i++) {
if (!isValidKeyType(tupleType.getTypeAt(i))) {
return false;
}
}
return true;
} else {
return typeInfo.isKeyType();
}
}
@Override
public String toString() {
StringBuilder bld = new StringBuilder("Tuple");
bld.append(types.length).append('<');
bld.append(types[0]);
for (int i = 1; i < types.length; i++) {
bld.append(", ").append(types[i]);
}
bld.append('>');
return bld.toString();
}
}
......@@ -42,8 +42,11 @@ public class CopyableValueComparator<T extends CopyableValue<T> & Comparable<T>>
private transient T reference;
private transient T tempReference;
private final Comparable[] extractedKey = new Comparable[1];
private final TypeComparator[] comparators = new TypeComparator[] {this};
public CopyableValueComparator(boolean ascending, Class<T> type) {
this.type = type;
this.ascendingComparison = ascending;
......@@ -79,7 +82,7 @@ public class CopyableValueComparator<T extends CopyableValue<T> & Comparable<T>>
}
@Override
public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
if (tempReference == null) {
tempReference = InstantiationUtil.instantiate(type, CopyableValue.class);
}
......@@ -121,6 +124,17 @@ public class CopyableValueComparator<T extends CopyableValue<T> & Comparable<T>>
public TypeComparator<T> duplicate() {
return new CopyableValueComparator<T>(ascendingComparison, type);
}
@Override
public Object[] extractKeys(T record) {
extractedKey[0] = record;
return extractedKey;
}
@Override
public TypeComparator[] getComparators() {
return comparators;
}
// --------------------------------------------------------------------------------------------
// unsupported normalization
......
......@@ -22,64 +22,54 @@ import java.io.Serializable;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.types.NullFieldException;
import org.apache.flink.types.NullKeyFieldException;
public class TuplePairComparator<T1 extends Tuple, T2 extends Tuple> extends TypePairComparator<T1, T2> implements Serializable {
public class GenericPairComparator<T1, T2> extends TypePairComparator<T1, T2>
implements Serializable {
private static final long serialVersionUID = 1L;
private final int[] keyFields1, keyFields2;
private final TypeComparator<T1> comparator1;
private final TypeComparator<T2> comparator2;
private final TypeComparator<Object>[] comparators1;
private final TypeComparator<Object>[] comparators2;
private final Object[] referenceKeyFields;
@SuppressWarnings("unchecked")
public TuplePairComparator(int[] keyFields1, int[] keyFields2, TypeComparator<Object>[] comparators1, TypeComparator<Object>[] comparators2) {
if(keyFields1.length != keyFields2.length
|| keyFields1.length != comparators1.length
|| keyFields2.length != comparators2.length) {
public GenericPairComparator(TypeComparator<T1> comparator1, TypeComparator<T2> comparator2) {
this.comparator1 = comparator1;
this.comparator2 = comparator2;
this.comparators1 = comparator1.getComparators();
this.comparators2 = comparator2.getComparators();
if(comparators1.length != comparators2.length) {
throw new IllegalArgumentException("Number of key fields and comparators differ.");
}
int numKeys = keyFields1.length;
this.keyFields1 = keyFields1;
this.keyFields2 = keyFields2;
this.comparators1 = new TypeComparator[numKeys];
this.comparators2 = new TypeComparator[numKeys];
int numKeys = comparators1.length;
for(int i = 0; i < numKeys; i++) {
this.comparators1[i] = comparators1[i].duplicate();
this.comparators2[i] = comparators2[i].duplicate();
}
this.referenceKeyFields = new Object[numKeys];
}
@Override
public void setReference(T1 reference) {
for (int i = 0; i < this.comparators1.length; i++) {
try {
this.comparators1[i].setReference(reference
.getFieldNotNull(keyFields1[i]));
} catch (NullFieldException nfex) {
throw new NullKeyFieldException(nfex);
}
}
Object[] keys = comparator1.extractKeys(reference);
System.arraycopy(keys, 0, referenceKeyFields, 0, keys.length);
}
@Override
public boolean equalToReference(T2 candidate) {
Object[] keys = comparator2.extractKeys(candidate);
for (int i = 0; i < this.comparators1.length; i++) {
try {
if (!this.comparators1[i].equalToReference(candidate
.getFieldNotNull(keyFields2[i]))) {
return false;
}
} catch (NullFieldException nfex) {
throw new NullKeyFieldException(nfex);
if (this.comparators1[i].compare(referenceKeyFields[i], keys[i]) != 0) {
return false;
}
}
return true;
......@@ -87,17 +77,13 @@ public class TuplePairComparator<T1 extends Tuple, T2 extends Tuple> extends Typ
@Override
public int compareToReference(T2 candidate) {
Object[] keys = comparator2.extractKeys(candidate);
for (int i = 0; i < this.comparators1.length; i++) {
try {
this.comparators2[i].setReference(candidate
.getFieldNotNull(keyFields2[i]));
int res = this.comparators1[i]
.compareToReference(this.comparators2[i]);
if (res != 0) {
return res;
}
} catch (NullFieldException nfex) {
throw new NullKeyFieldException(nfex);
// We reverse ordering here because our "compareToReference" does work in a mirrored
// way compared to Comparable.compareTo
int res = this.comparators1[i].compare(keys[i], referenceKeyFields[i]);
if(res != 0) {
return res;
}
}
return 0;
......
......@@ -52,6 +52,10 @@ public class GenericTypeComparator<T extends Comparable<T>> extends TypeComparat
private transient Kryo kryo;
private final Comparable[] extractedKey = new Comparable[1];
private final TypeComparator[] comparators = new TypeComparator[] {this};
// ------------------------------------------------------------------------
public GenericTypeComparator(boolean ascending, TypeSerializer<T> serializer, Class<T> type) {
......@@ -100,7 +104,7 @@ public class GenericTypeComparator<T extends Comparable<T>> extends TypeComparat
}
@Override
public int compare(final DataInputView firstSource, final DataInputView secondSource) throws IOException {
public int compareSerialized(final DataInputView firstSource, final DataInputView secondSource) throws IOException {
if (this.serializer == null) {
this.serializer = this.serializerFactory.getSerializer();
}
......@@ -164,6 +168,17 @@ public class GenericTypeComparator<T extends Comparable<T>> extends TypeComparat
}
}
@Override
public Object[] extractKeys(T record) {
extractedKey[0] = record;
return extractedKey;
}
@Override
public TypeComparator[] getComparators() {
return comparators;
}
// ------------------------------------------------------------------------
@Override
......
......@@ -52,6 +52,7 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
private final Class<T> type;
private final Comparable[] extractedKeys;
@SuppressWarnings("unchecked")
public PojoComparator(Field[] keyFields, TypeComparator<?>[] comparators, TypeSerializer<T> serializer, Class<T> type) {
......@@ -101,6 +102,8 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
this.numLeadingNormalizableKeys = nKeys;
this.normalizableKeyPrefixLen = nKeyLen;
this.invertNormKey = inverted;
extractedKeys = new Comparable[keyFields.length];
}
@SuppressWarnings("unchecked")
......@@ -128,7 +131,7 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
throw new RuntimeException("Cannot copy serializer", e);
}
extractedKeys = new Comparable[keyFields.length];
}
private void writeObject(ObjectOutputStream out)
......@@ -172,7 +175,7 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
return this.keyFields;
}
public TypeComparator<Object>[] getComparators() {
public TypeComparator[] getComparators() {
return this.comparators;
}
......@@ -269,7 +272,7 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
}
@Override
public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
T first = this.serializer.createInstance();
T second = this.serializer.createInstance();
......@@ -343,6 +346,23 @@ public final class PojoComparator<T> extends TypeComparator<T> implements java.i
return new PojoComparator<T>(this);
}
@Override
public Object[] extractKeys(T record) {
int i = 0;
try {
for (; i < keyFields.length; i++) {
extractedKeys[i] = (Comparable) keyFields[i].get(record);
}
}
catch (IllegalAccessException iaex) {
throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo.");
}
catch (NullPointerException npex) {
throw new NullKeyFieldException(this.keyFields[i].toString());
}
return extractedKeys;
}
// --------------------------------------------------------------------------------------------
/**
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
public class PojoPairComparator<T1, T2> extends TypePairComparator<T1, T2> implements Serializable {
private static final long serialVersionUID = 1L;
@SuppressWarnings("unused")
private final int[] keyPositions1, keyPositions2;
private transient Field[] keyFields1, keyFields2;
private final TypeComparator<Object>[] comparators1;
private final TypeComparator<Object>[] comparators2;
@SuppressWarnings("unchecked")
public PojoPairComparator(int[] keyPositions1, Field[] keyFields1, int[] keyPositions2, Field[] keyFields2, TypeComparator<Object>[] comparators1, TypeComparator<Object>[] comparators2) {
if(keyPositions1.length != keyPositions2.length
|| keyPositions1.length != comparators1.length
|| keyPositions2.length != comparators2.length) {
throw new IllegalArgumentException("Number of key fields and comparators differ.");
}
int numKeys = keyPositions1.length;
this.keyPositions1 = keyPositions1;
this.keyPositions2 = keyPositions2;
this.keyFields1 = keyFields1;
this.keyFields2 = keyFields2;
this.comparators1 = new TypeComparator[numKeys];
this.comparators2 = new TypeComparator[numKeys];
for(int i = 0; i < numKeys; i++) {
this.comparators1[i] = comparators1[i].duplicate();
this.comparators2[i] = comparators2[i].duplicate();
}
}
private void writeObject(ObjectOutputStream out)
throws IOException, ClassNotFoundException {
out.defaultWriteObject();
out.writeInt(keyFields1.length);
for (Field field: keyFields1) {
out.writeObject(field.getDeclaringClass());
out.writeUTF(field.getName());
}
out.writeInt(keyFields2.length);
for (Field field: keyFields2) {
out.writeObject(field.getDeclaringClass());
out.writeUTF(field.getName());
}
}
private void readObject(ObjectInputStream in)
throws IOException, ClassNotFoundException {
in.defaultReadObject();
int numKeyFields = in.readInt();
keyFields1 = new Field[numKeyFields];
for (int i = 0; i < numKeyFields; i++) {
Class<?> clazz = (Class<?>)in.readObject();
String fieldName = in.readUTF();
try {
keyFields1[i] = clazz.getField(fieldName);
keyFields1[i].setAccessible(true);
} catch (NoSuchFieldException e) {
throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup.");
}
}
numKeyFields = in.readInt();
keyFields2 = new Field[numKeyFields];
for (int i = 0; i < numKeyFields; i++) {
Class<?> clazz = (Class<?>)in.readObject();
String fieldName = in.readUTF();
try {
keyFields2[i] = clazz.getField(fieldName);
keyFields2[i].setAccessible(true);
} catch (NoSuchFieldException e) {
throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup.");
}
}
}
@Override
public void setReference(T1 reference) {
for(int i=0; i < this.comparators1.length; i++) {
try {
this.comparators1[i].setReference(keyFields1[i].get(reference));
} catch (IllegalAccessException e) {
throw new RuntimeException("This should not happen since we call setAccesssible(true) in PojoTypeInfo.");
}
}
}
@Override
public boolean equalToReference(T2 candidate) {
for(int i=0; i < this.comparators1.length; i++) {
try {
if(!this.comparators1[i].equalToReference(keyFields2[i].get(candidate))) {
return false;
}
} catch (IllegalAccessException e) {
throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup.");
}
}
return true;
}
@Override
public int compareToReference(T2 candidate) {
for(int i=0; i < this.comparators1.length; i++) {
try {
this.comparators2[i].setReference(keyFields2[i].get(candidate));
} catch (IllegalAccessException e) {
throw new RuntimeException("Class resolved at TaskManager is not compatible with class read during Plan setup.");
}
int res = this.comparators1[i].compareToReference(this.comparators2[i]);
if(res != 0) {
return res;
}
}
return 0;
}
}
......@@ -16,129 +16,30 @@
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.java.tuple.Tuple;
public final class RuntimePairComparatorFactory<T1 extends Tuple, T2 extends Tuple> implements TypePairComparatorFactory<T1, T2>, java.io.Serializable {
public final class RuntimePairComparatorFactory<T1, T2>
implements TypePairComparatorFactory<T1, T2>, java.io.Serializable {
private static final long serialVersionUID = 1L;
@SuppressWarnings("unchecked")
@Override
public TypePairComparator<T1, T2> createComparator12(TypeComparator<T1> comparator1, TypeComparator<T2> comparator2) {
if ((comparator1 instanceof TupleLeadingFieldComparator) && (comparator2 instanceof TupleLeadingFieldComparator)) {
TypeComparator<?> comp1 = ((TupleLeadingFieldComparator<?,?>) comparator1).getFieldComparator();
TypeComparator<?> comp2 = ((TupleLeadingFieldComparator<?,?>) comparator2).getFieldComparator();
return createLeadingFieldPairComp(comp1, comp2);
}
else {
int[] keyPos1;
int[] keyPos2;
TypeComparator<Object>[] comps1;
TypeComparator<Object>[] comps2;
// get info from first comparator
if (comparator1 instanceof TupleComparator) {
TupleComparator<?> tupleComp1 = (TupleComparator<?>) comparator1;
keyPos1 = tupleComp1.getKeyPositions();
comps1 = tupleComp1.getComparators();
}
else if (comparator1 instanceof TupleLeadingFieldComparator) {
TupleLeadingFieldComparator<?, ?> tupleComp1 = (TupleLeadingFieldComparator<?, ?>) comparator1;
keyPos1 = new int[] {0};
comps1 = new TypeComparator[] { tupleComp1.getFieldComparator() };
}
else {
throw new IllegalArgumentException("Cannot instantiate pair comparator from the given comparator: " + comparator1);
}
// get info from second comparator
if (comparator2 instanceof TupleComparator) {
TupleComparator<?> tupleComp2 = (TupleComparator<?>) comparator2;
keyPos2 = tupleComp2.getKeyPositions();
comps2 = tupleComp2.getComparators();
}
else if (comparator2 instanceof TupleLeadingFieldComparator) {
TupleLeadingFieldComparator<?, ?> tupleComp2 = (TupleLeadingFieldComparator<?, ?>) comparator2;
keyPos2 = new int[] {0};
comps2 = new TypeComparator[] { tupleComp2.getFieldComparator() };
}
else {
throw new IllegalArgumentException("Cannot instantiate pair comparator from the given comparator: " + comparator1);
}
return (TypePairComparator<T1, T2>) new TuplePairComparator<Tuple, Tuple>(keyPos1, keyPos2, comps1, comps2);
}
public TypePairComparator<T1, T2> createComparator12(
TypeComparator<T1> comparator1,
TypeComparator<T2> comparator2) {
return new GenericPairComparator<T1, T2>(comparator1, comparator2);
}
@SuppressWarnings("unchecked")
@Override
public TypePairComparator<T2, T1> createComparator21(TypeComparator<T1> comparator1, TypeComparator<T2> comparator2) {
if ((comparator1 instanceof TupleLeadingFieldComparator) && (comparator2 instanceof TupleLeadingFieldComparator)) {
TypeComparator<?> comp1 = ((TupleLeadingFieldComparator<?,?>) comparator1).getFieldComparator();
TypeComparator<?> comp2 = ((TupleLeadingFieldComparator<?,?>) comparator2).getFieldComparator();
return createLeadingFieldPairComp(comp2, comp1);
}
else {
int[] keyPos1;
int[] keyPos2;
TypeComparator<Object>[] comps1;
TypeComparator<Object>[] comps2;
// get info from first comparator
if (comparator1 instanceof TupleComparator) {
TupleComparator<?> tupleComp1 = (TupleComparator<?>) comparator1;
keyPos1 = tupleComp1.getKeyPositions();
comps1 = tupleComp1.getComparators();
}
else if (comparator1 instanceof TupleLeadingFieldComparator) {
TupleLeadingFieldComparator<?, ?> tupleComp1 = (TupleLeadingFieldComparator<?, ?>) comparator1;
keyPos1 = new int[] {0};
comps1 = new TypeComparator[] { tupleComp1.getFieldComparator() };
}
else {
throw new IllegalArgumentException("Cannot instantiate pair comparator from the given comparator: " + comparator1);
}
// get info from second comparator
if (comparator2 instanceof TupleComparator) {
TupleComparator<?> tupleComp2 = (TupleComparator<?>) comparator2;
keyPos2 = tupleComp2.getKeyPositions();
comps2 = tupleComp2.getComparators();
}
else if (comparator2 instanceof TupleLeadingFieldComparator) {
TupleLeadingFieldComparator<?, ?> tupleComp2 = (TupleLeadingFieldComparator<?, ?>) comparator2;
keyPos2 = new int[] {0};
comps2 = new TypeComparator[] { tupleComp2.getFieldComparator() };
}
else {
throw new IllegalArgumentException("Cannot instantiate pair comparator from the given comparator: " + comparator1);
}
return (TypePairComparator<T2, T1>) new TuplePairComparator<Tuple, Tuple>(keyPos2, keyPos1, comps2, comps1);
}
}
private static <K, T1 extends Tuple, T2 extends Tuple> TupleLeadingFieldPairComparator<K, T1, T2> createLeadingFieldPairComp(
TypeComparator<?> comp1, TypeComparator<?> comp2)
{
@SuppressWarnings("unchecked")
TypeComparator<K> c1 = (TypeComparator<K>) comp1;
@SuppressWarnings("unchecked")
TypeComparator<K> c2 = (TypeComparator<K>) comp2;
return new TupleLeadingFieldPairComparator<K, T1, T2>(c1, c2);
public TypePairComparator<T2, T1> createComparator21(
TypeComparator<T1> comparator1,
TypeComparator<T2> comparator2) {
return new GenericPairComparator<T2, T1>(comparator2, comparator1);
}
}
}
\ No newline at end of file
......@@ -18,135 +18,33 @@
package org.apache.flink.api.java.typeutils.runtime;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.types.KeyFieldOutOfBoundsException;
import org.apache.flink.types.NullFieldException;
import org.apache.flink.types.NullKeyFieldException;
public final class TupleComparator<T extends Tuple> extends TypeComparator<T> implements java.io.Serializable {
public final class TupleComparator<T extends Tuple> extends TupleComparatorBase<T> {
private static final long serialVersionUID = 1L;
/** key positions describe which fields are keys in what order */
private final int[] keyPositions;
/** comparators for the key fields, in the same order as the key fields */
private final TypeComparator<Object>[] comparators;
private final Object[] extractedKeys;
/** serializer factories to duplicate non thread-safe serializers */
private final TypeSerializerFactory<Object>[] serializerFactories;
private final int[] normalizedKeyLengths;
private final int numLeadingNormalizableKeys;
private final int normalizableKeyPrefixLen;
private final boolean invertNormKey;
/** serializers to deserialize the first n fields for comparison */
private transient TypeSerializer<Object>[] serializers;
// cache for the deserialized field objects
private transient Object[] deserializedFields1;
private transient Object[] deserializedFields2;
@SuppressWarnings("unchecked")
public TupleComparator(int[] keyPositions, TypeComparator<?>[] comparators, TypeSerializer<?>[] serializers) {
// set the default utils
this.keyPositions = keyPositions;
this.comparators = (TypeComparator<Object>[]) comparators;
this.serializers = (TypeSerializer<Object>[]) serializers;
// set the serializer factories.
this.serializerFactories = new TypeSerializerFactory[this.serializers.length];
for (int i = 0; i < serializers.length; i++) {
this.serializerFactories[i] = this.serializers[i].isStateful() ?
new RuntimeStatefulSerializerFactory<Object>(this.serializers[i], Object.class) :
new RuntimeStatelessSerializerFactory<Object>(this.serializers[i], Object.class);
}
// set up auxiliary fields for normalized key support
this.normalizedKeyLengths = new int[keyPositions.length];
int nKeys = 0;
int nKeyLen = 0;
boolean inverted = false;
for (int i = 0; i < this.keyPositions.length; i++) {
TypeComparator<?> k = this.comparators[i];
// as long as the leading keys support normalized keys, we can build up the composite key
if (k.supportsNormalizedKey()) {
if (i == 0) {
// the first comparator decides whether we need to invert the key direction
inverted = k.invertNormalizedKey();
}
else if (k.invertNormalizedKey() != inverted) {
// if a successor does not agree on the inversion direction, it cannot be part of the normalized key
break;
}
nKeys++;
final int len = k.getNormalizeKeyLen();
if (len < 0) {
throw new RuntimeException("Comparator " + k.getClass().getName() + " specifies an invalid length for the normalized key: " + len);
}
this.normalizedKeyLengths[i] = len;
nKeyLen += len;
if (nKeyLen < 0) {
// overflow, which means we are out of budget for normalized key space anyways
nKeyLen = Integer.MAX_VALUE;
break;
}
} else {
break;
}
}
this.numLeadingNormalizableKeys = nKeys;
this.normalizableKeyPrefixLen = nKeyLen;
this.invertNormKey = inverted;
super(keyPositions, comparators, serializers);
extractedKeys = new Object[keyPositions.length];
}
@SuppressWarnings("unchecked")
private TupleComparator(TupleComparator<T> toClone) {
// copy fields and serializer factories
this.keyPositions = toClone.keyPositions;
this.serializerFactories = toClone.serializerFactories;
this.comparators = new TypeComparator[toClone.comparators.length];
for (int i = 0; i < toClone.comparators.length; i++) {
this.comparators[i] = toClone.comparators[i].duplicate();
}
this.normalizedKeyLengths = toClone.normalizedKeyLengths;
this.numLeadingNormalizableKeys = toClone.numLeadingNormalizableKeys;
this.normalizableKeyPrefixLen = toClone.normalizableKeyPrefixLen;
this.invertNormKey = toClone.invertNormKey;
}
// --------------------------------------------------------------------------------------------
// Comparator Methods
// --------------------------------------------------------------------------------------------
protected int[] getKeyPositions() {
return this.keyPositions;
}
protected TypeComparator<Object>[] getComparators() {
return this.comparators;
super(toClone);
extractedKeys = new Object[keyPositions.length];
}
// --------------------------------------------------------------------------------------------
......@@ -207,28 +105,6 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
throw new KeyFieldOutOfBoundsException(keyPositions[i]);
}
}
@Override
public int compareToReference(TypeComparator<T> referencedComparator) {
TupleComparator<T> other = (TupleComparator<T>) referencedComparator;
int i = 0;
try {
for (; i < this.keyPositions.length; i++) {
int cmp = this.comparators[i].compareToReference(other.comparators[i]);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
catch (NullPointerException npex) {
throw new NullKeyFieldException(keyPositions[i]);
}
catch (IndexOutOfBoundsException iobex) {
throw new KeyFieldOutOfBoundsException(keyPositions[i]);
}
}
@Override
public int compare(T first, T second) {
......@@ -252,132 +128,33 @@ public final class TupleComparator<T extends Tuple> extends TypeComparator<T> im
}
}
@Override
public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
if (deserializedFields1 == null) {
instantiateDeserializationUtils();
}
int i = 0;
try {
for (; i < serializers.length; i++) {
deserializedFields1[i] = serializers[i].deserialize(deserializedFields1[i], firstSource);
deserializedFields2[i] = serializers[i].deserialize(deserializedFields2[i], secondSource);
}
for (i = 0; i < keyPositions.length; i++) {
int keyPos = keyPositions[i];
int cmp = comparators[i].compare(deserializedFields1[keyPos], deserializedFields2[keyPos]);
if (cmp != 0) {
return cmp;
}
}
return 0;
} catch (NullPointerException npex) {
throw new NullKeyFieldException(keyPositions[i]);
} catch (IndexOutOfBoundsException iobex) {
throw new KeyFieldOutOfBoundsException(keyPositions[i]);
}
}
@Override
public boolean supportsNormalizedKey() {
return this.numLeadingNormalizableKeys > 0;
}
@Override
public int getNormalizeKeyLen() {
return this.normalizableKeyPrefixLen;
}
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
return this.numLeadingNormalizableKeys < this.keyPositions.length ||
this.normalizableKeyPrefixLen == Integer.MAX_VALUE ||
this.normalizableKeyPrefixLen > keyBytes;
}
@Override
public void putNormalizedKey(T value, MemorySegment target, int offset, int numBytes) {
int i = 0;
try {
for (; i < this.numLeadingNormalizableKeys && numBytes > 0; i++)
{
int len = this.normalizedKeyLengths[i];
for (; i < this.numLeadingNormalizableKeys && numBytes > 0; i++) {
int len = this.normalizedKeyLengths[i];
len = numBytes >= len ? len : numBytes;
this.comparators[i].putNormalizedKey(value.getFieldNotNull(this.keyPositions[i]), target, offset, len);
numBytes -= len;
offset += len;
}
}
catch (NullFieldException nfex) {
} catch (NullFieldException nfex) {
throw new NullKeyFieldException(nfex);
}
catch (NullPointerException npex) {
} catch (NullPointerException npex) {
throw new NullKeyFieldException(this.keyPositions[i]);
}
}
@Override
public boolean invertNormalizedKey() {
return this.invertNormKey;
}
@Override
public boolean supportsSerializationWithKeyNormalization() {
return false;
}
@Override
public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
throw new UnsupportedOperationException();
public Object[] extractKeys(T record) {
for (int i = 0; i < keyPositions.length; i++) {
extractedKeys[i] = record.getField(keyPositions[i]);
}
return extractedKeys;
}
@Override
public TupleComparator<T> duplicate() {
public TypeComparator<T> duplicate() {
return new TupleComparator<T>(this);
}
// --------------------------------------------------------------------------------------------
@SuppressWarnings("unchecked")
private final void instantiateDeserializationUtils() {
if (this.serializers == null) {
this.serializers = new TypeSerializer[this.serializerFactories.length];
for (int i = 0; i < this.serializers.length; i++) {
this.serializers[i] = this.serializerFactories[i].getSerializer();
}
}
this.deserializedFields1 = new Object[this.serializers.length];
this.deserializedFields2 = new Object[this.serializers.length];
for (int i = 0; i < this.serializers.length; i++) {
this.deserializedFields1[i] = this.serializers[i].createInstance();
this.deserializedFields2[i] = this.serializers[i].createInstance();
}
}
// --------------------------------------------------------------------------------------------
/**
* A sequence of prime numbers to be used for salting the computed hash values.
* Based on some empirical evidence, we are using a 32-element subsequence of the
* OEIS sequence #A068652 (numbers such that every cyclic permutation is a prime).
*
* @see: http://en.wikipedia.org/wiki/List_of_prime_numbers
* @see: http://oeis.org/A068652
*/
private static final int[] HASH_SALT = new int[] {
73 , 79 , 97 , 113 , 131 , 197 , 199 , 311 ,
337 , 373 , 719 , 733 , 919 , 971 , 991 , 1193 ,
1931 , 3119 , 3779 , 7793 , 7937 , 9311 , 9377 , 11939 ,
19391, 19937, 37199, 39119, 71993, 91193, 93719, 93911 };
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.KeyFieldOutOfBoundsException;
import org.apache.flink.types.NullKeyFieldException;
import java.io.IOException;
public abstract class TupleComparatorBase<T> extends TypeComparator<T> implements java.io.Serializable {
/** key positions describe which fields are keys in what order */
protected int[] keyPositions;
/** comparators for the key fields, in the same order as the key fields */
protected TypeComparator[] comparators;
/** serializer factories to duplicate non thread-safe serializers */
protected TypeSerializerFactory<Object>[] serializerFactories;
protected int[] normalizedKeyLengths;
protected int numLeadingNormalizableKeys;
protected int normalizableKeyPrefixLen;
protected boolean invertNormKey;
/** serializers to deserialize the first n fields for comparison */
protected transient TypeSerializer[] serializers;
// cache for the deserialized field objects
protected transient Object[] deserializedFields1;
protected transient Object[] deserializedFields2;
@SuppressWarnings("unchecked")
public TupleComparatorBase(int[] keyPositions, TypeComparator<?>[] comparators, TypeSerializer<?>[] serializers) {
// set the default utils
this.keyPositions = keyPositions;
this.comparators = (TypeComparator<Object>[]) comparators;
this.serializers = (TypeSerializer<Object>[]) serializers;
// set the serializer factories.
this.serializerFactories = new TypeSerializerFactory[this.serializers.length];
for (int i = 0; i < serializers.length; i++) {
this.serializerFactories[i] = this.serializers[i].isStateful() ?
new RuntimeStatefulSerializerFactory<Object>(this.serializers[i], Object.class) :
new RuntimeStatelessSerializerFactory<Object>(this.serializers[i], Object.class);
}
// set up auxiliary fields for normalized key support
this.normalizedKeyLengths = new int[keyPositions.length];
int nKeys = 0;
int nKeyLen = 0;
boolean inverted = false;
for (int i = 0; i < this.keyPositions.length; i++) {
TypeComparator<?> k = this.comparators[i];
// as long as the leading keys support normalized keys, we can build up the composite key
if (k.supportsNormalizedKey()) {
if (i == 0) {
// the first comparator decides whether we need to invert the key direction
inverted = k.invertNormalizedKey();
}
else if (k.invertNormalizedKey() != inverted) {
// if a successor does not agree on the inversion direction, it cannot be part of the normalized key
break;
}
nKeys++;
final int len = k.getNormalizeKeyLen();
if (len < 0) {
throw new RuntimeException("Comparator " + k.getClass().getName() + " specifies an invalid length for the normalized key: " + len);
}
this.normalizedKeyLengths[i] = len;
nKeyLen += len;
if (nKeyLen < 0) {
// overflow, which means we are out of budget for normalized key space anyways
nKeyLen = Integer.MAX_VALUE;
break;
}
} else {
break;
}
}
this.numLeadingNormalizableKeys = nKeys;
this.normalizableKeyPrefixLen = nKeyLen;
this.invertNormKey = inverted;
}
@SuppressWarnings("unchecked")
protected TupleComparatorBase(TupleComparatorBase<T> toClone) {
privateDuplicate(toClone);
}
// We need this because we cannot call the cloning constructor from the
// ScalaTupleComparator
protected void privateDuplicate(TupleComparatorBase<T> toClone) {
// copy fields and serializer factories
this.keyPositions = toClone.keyPositions;
this.serializerFactories = toClone.serializerFactories;
this.comparators = new TypeComparator[toClone.comparators.length];
for (int i = 0; i < toClone.comparators.length; i++) {
this.comparators[i] = toClone.comparators[i].duplicate();
}
this.normalizedKeyLengths = toClone.normalizedKeyLengths;
this.numLeadingNormalizableKeys = toClone.numLeadingNormalizableKeys;
this.normalizableKeyPrefixLen = toClone.normalizableKeyPrefixLen;
this.invertNormKey = toClone.invertNormKey;
}
// --------------------------------------------------------------------------------------------
// Comparator Methods
// --------------------------------------------------------------------------------------------
protected int[] getKeyPositions() {
return this.keyPositions;
}
public TypeComparator[] getComparators() {
return this.comparators;
}
// --------------------------------------------------------------------------------------------
// Comparator Methods
// --------------------------------------------------------------------------------------------
@Override
public int compareToReference(TypeComparator<T> referencedComparator) {
TupleComparatorBase<T> other = (TupleComparatorBase<T>) referencedComparator;
int i = 0;
try {
for (; i < this.keyPositions.length; i++) {
int cmp = this.comparators[i].compareToReference(other.comparators[i]);
if (cmp != 0) {
return cmp;
}
}
return 0;
}
catch (NullPointerException npex) {
throw new NullKeyFieldException(keyPositions[i]);
}
catch (IndexOutOfBoundsException iobex) {
throw new KeyFieldOutOfBoundsException(keyPositions[i]);
}
}
@Override
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
if (deserializedFields1 == null) {
instantiateDeserializationUtils();
}
int i = 0;
try {
for (; i < serializers.length; i++) {
deserializedFields1[i] = serializers[i].deserialize(deserializedFields1[i], firstSource);
deserializedFields2[i] = serializers[i].deserialize(deserializedFields2[i], secondSource);
}
for (i = 0; i < keyPositions.length; i++) {
int keyPos = keyPositions[i];
int cmp = comparators[i].compare(deserializedFields1[keyPos], deserializedFields2[keyPos]);
if (cmp != 0) {
return cmp;
}
}
return 0;
} catch (NullPointerException npex) {
throw new NullKeyFieldException(keyPositions[i]);
} catch (IndexOutOfBoundsException iobex) {
throw new KeyFieldOutOfBoundsException(keyPositions[i]);
}
}
@Override
public boolean supportsNormalizedKey() {
return this.numLeadingNormalizableKeys > 0;
}
@Override
public int getNormalizeKeyLen() {
return this.normalizableKeyPrefixLen;
}
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
return this.numLeadingNormalizableKeys < this.keyPositions.length ||
this.normalizableKeyPrefixLen == Integer.MAX_VALUE ||
this.normalizableKeyPrefixLen > keyBytes;
}
@Override
public boolean invertNormalizedKey() {
return this.invertNormKey;
}
@Override
public boolean supportsSerializationWithKeyNormalization() {
return false;
}
@Override
public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
throw new UnsupportedOperationException();
}
// --------------------------------------------------------------------------------------------
@SuppressWarnings("unchecked")
private final void instantiateDeserializationUtils() {
if (this.serializers == null) {
this.serializers = new TypeSerializer[this.serializerFactories.length];
for (int i = 0; i < this.serializers.length; i++) {
this.serializers[i] = this.serializerFactories[i].getSerializer();
}
}
this.deserializedFields1 = new Object[this.serializers.length];
this.deserializedFields2 = new Object[this.serializers.length];
for (int i = 0; i < this.serializers.length; i++) {
this.deserializedFields1[i] = this.serializers[i].createInstance();
this.deserializedFields2[i] = this.serializers[i].createInstance();
}
}
// --------------------------------------------------------------------------------------------
/**
* A sequence of prime numbers to be used for salting the computed hash values.
* Based on some empirical evidence, we are using a 32-element subsequence of the
* OEIS sequence #A068652 (numbers such that every cyclic permutation is a prime).
*
* @see: http://en.wikipedia.org/wiki/List_of_prime_numbers
* @see: http://oeis.org/A068652
*/
protected static final int[] HASH_SALT = new int[] {
73 , 79 , 97 , 113 , 131 , 197 , 199 , 311 ,
337 , 373 , 719 , 733 , 919 , 971 , 991 , 1193 ,
1931 , 3119 , 3779 , 7793 , 7937 , 9311 , 9377 , 11939 ,
19391, 19937, 37199, 39119, 71993, 91193, 93719, 93911 };
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.types.NullFieldException;
import org.apache.flink.types.NullKeyFieldException;
public final class TupleLeadingFieldComparator<T extends Tuple, K> extends TypeComparator<T>
implements java.io.Serializable
{
private static final long serialVersionUID = 1L;
private final TypeComparator<K> comparator;
public TupleLeadingFieldComparator(TypeComparator<K> comparator) {
this.comparator = comparator;
}
public TypeComparator<K> getComparator() {
return this.comparator;
}
@Override
public int hash(T value) {
try {
return comparator.hash(value.<K> getFieldNotNull(0));
} catch (NullFieldException nfex) {
throw new NullKeyFieldException(nfex);
}
}
@Override
public void setReference(T toCompare) {
try {
this.comparator.setReference(toCompare.<K> getFieldNotNull(0));
} catch (NullFieldException nfex) {
throw new NullKeyFieldException(nfex);
}
}
@Override
public boolean equalToReference(T candidate) {
try {
return this.comparator.equalToReference(candidate
.<K> getFieldNotNull(0));
} catch (NullFieldException nfex) {
throw new NullKeyFieldException(nfex);
}
}
@SuppressWarnings("unchecked")
@Override
public int compareToReference(TypeComparator<T> referencedComparator) {
return this.comparator.compareToReference(((TupleLeadingFieldComparator<T, K>) referencedComparator).comparator);
}
@Override
public int compare(T first, T second) {
try {
return this.comparator.compare(first.<K> getFieldNotNull(0),
second.<K> getFieldNotNull(0));
} catch (NullFieldException nfex) {
throw new NullKeyFieldException(nfex);
}
}
@Override
public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
return this.comparator.compare(firstSource, secondSource);
}
@Override
public boolean supportsNormalizedKey() {
return this.comparator.supportsNormalizedKey();
}
@Override
public boolean supportsSerializationWithKeyNormalization() {
return false;
}
@Override
public int getNormalizeKeyLen() {
return this.comparator.getNormalizeKeyLen();
}
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
return this.comparator.isNormalizedKeyPrefixOnly(keyBytes);
}
@Override
public void putNormalizedKey(T record, MemorySegment target, int offset, int numBytes) {
try {
this.comparator.putNormalizedKey(record.<K> getFieldNotNull(0),
target, offset, numBytes);
} catch (NullFieldException nfex) {
throw new NullKeyFieldException(nfex);
}
}
@Override
public void writeWithKeyNormalization(T record, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public boolean invertNormalizedKey() {
return this.comparator.invertNormalizedKey();
}
@Override
public TypeComparator<T> duplicate() {
return new TupleLeadingFieldComparator<T, K>(comparator.duplicate());
}
// --------------------------------------------------------------------------------------------
protected TypeComparator<K> getFieldComparator() {
return this.comparator;
}
}
......@@ -19,7 +19,6 @@
package org.apache.flink.api.java.typeutils.runtime;
import java.io.IOException;
import java.util.Arrays;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple;
......@@ -28,48 +27,15 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.NullFieldException;
public final class TupleSerializer<T extends Tuple> extends TypeSerializer<T> {
public final class TupleSerializer<T extends Tuple> extends TupleSerializerBase<T> {
private static final long serialVersionUID = 1L;
private final Class<T> tupleClass;
private final TypeSerializer<Object>[] fieldSerializers;
private final int arity;
private final boolean stateful;
@SuppressWarnings("unchecked")
public TupleSerializer(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
this.tupleClass = tupleClass;
this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
this.arity = fieldSerializers.length;
boolean stateful = false;
for (TypeSerializer<?> ser : fieldSerializers) {
if (ser.isStateful()) {
stateful = true;
break;
}
}
this.stateful = stateful;
}
@Override
public boolean isImmutableType() {
return false;
super(tupleClass, fieldSerializers);
}
@Override
public boolean isStateful() {
return this.stateful;
}
@Override
public T createInstance() {
try {
......@@ -96,12 +62,6 @@ public final class TupleSerializer<T extends Tuple> extends TypeSerializer<T> {
return reuse;
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(T value, DataOutputView target) throws IOException {
for (int i = 0; i < arity; i++) {
......@@ -122,33 +82,4 @@ public final class TupleSerializer<T extends Tuple> extends TypeSerializer<T> {
}
return reuse;
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
for (int i = 0; i < arity; i++) {
fieldSerializers[i].copy(source, target);
}
}
@Override
public int hashCode() {
int hashCode = arity * 47;
for (TypeSerializer<?> ser : this.fieldSerializers) {
hashCode = (hashCode << 7) | (hashCode >>> -7);
hashCode += ser.hashCode();
}
return hashCode;
}
@Override
public boolean equals(Object obj) {
if (obj != null && obj instanceof TupleSerializer) {
TupleSerializer<?> otherTS = (TupleSerializer<?>) obj;
return (otherTS.tupleClass == this.tupleClass) &&
Arrays.deepEquals(this.fieldSerializers, otherTS.fieldSerializers);
}
else {
return false;
}
}
}
......@@ -18,53 +18,83 @@
package org.apache.flink.api.java.typeutils.runtime;
import java.io.Serializable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.types.NullFieldException;
import org.apache.flink.types.NullKeyFieldException;
import java.io.IOException;
import java.util.Arrays;
public class TupleLeadingFieldPairComparator<K, T1 extends Tuple, T2 extends Tuple> extends TypePairComparator<T1, T2> implements Serializable {
public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
private static final long serialVersionUID = 1L;
protected final Class<T> tupleClass;
protected final TypeSerializer<Object>[] fieldSerializers;
protected final int arity;
protected final boolean stateful;
@SuppressWarnings("unchecked")
public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
this.tupleClass = tupleClass;
this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
this.arity = fieldSerializers.length;
boolean stateful = false;
for (TypeSerializer<?> ser : fieldSerializers) {
if (ser.isStateful()) {
stateful = true;
break;
}
}
this.stateful = stateful;
}
private final TypeComparator<K> comparator1;
private final TypeComparator<K> comparator2;
public TupleLeadingFieldPairComparator(TypeComparator<K> comparator1, TypeComparator<K> comparator2) {
this.comparator1 = comparator1;
this.comparator2 = comparator2;
@Override
public boolean isImmutableType() {
return false;
}
@Override
public void setReference(T1 reference) {
try {
this.comparator1.setReference(reference.<K> getFieldNotNull(0));
} catch (NullFieldException nfex) {
throw new NullKeyFieldException(nfex);
}
public boolean isStateful() {
return this.stateful;
}
@Override
public boolean equalToReference(T2 candidate) {
try {
return this.comparator1.equalToReference(candidate
.<K> getFieldNotNull(0));
} catch (NullFieldException nfex) {
throw new NullKeyFieldException(nfex);
}
public int getLength() {
return -1;
}
@Override
public int compareToReference(T2 candidate) {
try {
this.comparator2.setReference(candidate.<K> getFieldNotNull(0));
return this.comparator1.compareToReference(this.comparator2);
} catch (NullFieldException nfex) {
throw new NullKeyFieldException(nfex);
public void copy(DataInputView source, DataOutputView target) throws IOException {
for (int i = 0; i < arity; i++) {
fieldSerializers[i].copy(source, target);
}
}
@Override
public int hashCode() {
int hashCode = arity * 47;
for (TypeSerializer<?> ser : this.fieldSerializers) {
hashCode = (hashCode << 7) | (hashCode >>> -7);
hashCode += ser.hashCode();
}
return hashCode;
}
@Override
public boolean equals(Object obj) {
if (obj != null && obj instanceof TupleSerializerBase) {
TupleSerializerBase<?> otherTS = (TupleSerializerBase<?>) obj;
return (otherTS.tupleClass == this.tupleClass) &&
Arrays.deepEquals(this.fieldSerializers, otherTS.fieldSerializers);
}
else {
return false;
}
}
}
......@@ -46,8 +46,11 @@ public class ValueComparator<T extends Value & Comparable<T>> extends TypeCompar
private transient T tempReference;
private transient Kryo kryo;
private final Comparable[] extractedKey = new Comparable[1];
private final TypeComparator[] comparators = new TypeComparator[] {this};
public ValueComparator(boolean ascending, Class<T> type) {
this.type = type;
this.ascendingComparison = ascending;
......@@ -83,7 +86,7 @@ public class ValueComparator<T extends Value & Comparable<T>> extends TypeCompar
}
@Override
public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
if (reference == null) {
reference = InstantiationUtil.instantiate(type, Value.class);
}
......@@ -140,6 +143,17 @@ public class ValueComparator<T extends Value & Comparable<T>> extends TypeCompar
this.kryo.register(type);
}
}
@Override
public Object[] extractKeys(T record) {
extractedKey[0] = record;
return extractedKey;
}
@Override
public TypeComparator[] getComparators() {
return comparators;
}
// --------------------------------------------------------------------------------------------
// unsupported normalization
......
......@@ -43,7 +43,11 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
private transient T tempReference;
private transient Kryo kryo;
private final Comparable[] extractedKey = new Comparable[1];
private final TypeComparator[] comparators = new TypeComparator[] {this};
public WritableComparator(boolean ascending, Class<T> type) {
this.type = type;
this.ascendingComparison = ascending;
......@@ -79,7 +83,7 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
}
@Override
public int compare(DataInputView firstSource, DataInputView secondSource) throws IOException {
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
ensureReferenceInstantiated();
ensureTempReferenceInstantiated();
......@@ -123,6 +127,16 @@ public class WritableComparator<T extends Writable & Comparable<T>> extends Type
public TypeComparator<T> duplicate() {
return new WritableComparator<T>(ascendingComparison, type);
}
@Override
public Object[] extractKeys(T record) {
extractedKey[0] = record;
return extractedKey;
}
@Override public TypeComparator[] getComparators() {
return comparators;
}
// --------------------------------------------------------------------------------------------
// unsupported normalization
......
......@@ -264,7 +264,7 @@ public final class RecordComparator extends TypeComparator<Record> {
}
@Override
public int compare(DataInputView source1, DataInputView source2) throws IOException {
public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
this.temp1.read(source1);
this.temp2.read(source2);
......@@ -390,6 +390,18 @@ public final class RecordComparator extends TypeComparator<Record> {
}
}
@Override
public Object[] extractKeys(Record record) {
throw new UnsupportedOperationException("Record does not support extactKeys and " +
"getComparators. This cannot be used with the GenericPairComparator.");
}
@Override
public TypeComparator[] getComparators() {
throw new UnsupportedOperationException("Record does not support extactKeys and " +
"getComparators. This cannot be used with the GenericPairComparator.");
}
@Override
public boolean supportsCompareAgainstReference() {
return true;
......
......@@ -146,7 +146,7 @@ public class TypeInfoParserTest {
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, ((TupleTypeInfo<?>)ti).getTypeAt(1));
ti = TypeInfoParser.parse("Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>");
Assert.assertEquals("Tuple3<Tuple1<String>, Tuple1<Integer>, Tuple2<Long, Long>>", ti.toString());
Assert.assertEquals("Java Tuple3<Java Tuple1<String>, Java Tuple1<Integer>, Java Tuple2<Long, Long>>", ti.toString());
}
@Test
......@@ -190,13 +190,13 @@ public class TypeInfoParserTest {
Assert.assertTrue(((ObjectArrayTypeInfo<?, ?>) ti2).getComponentInfo() instanceof TupleTypeInfo);
TypeInformation<?> ti3 = TypeInfoParser.parse("Tuple2<Integer[],Double>[]");
Assert.assertEquals("ObjectArrayTypeInfo<Tuple2<BasicArrayTypeInfo<Integer>, Double>>", ti3.toString());
Assert.assertEquals("ObjectArrayTypeInfo<Java Tuple2<BasicArrayTypeInfo<Integer>, Double>>", ti3.toString());
}
@Test
public void testLargeMixedTuple() {
TypeInformation<?> ti = TypeInfoParser.parse("org.apache.flink.api.java.tuple.Tuple4<Double,java.lang.Class[],StringValue,Tuple1<int>>[]");
Assert.assertEquals("ObjectArrayTypeInfo<Tuple4<Double, ObjectArrayTypeInfo<GenericType<java.lang.Class>>, ValueType<org.apache.flink.types.StringValue>, Tuple1<Integer>>>", ti.toString());
Assert.assertEquals("ObjectArrayTypeInfo<Java Tuple4<Double, ObjectArrayTypeInfo<GenericType<java.lang.Class>>, ValueType<org.apache.flink.types.StringValue>, Java Tuple1<Integer>>>", ti.toString());
}
@Test
......
......@@ -19,16 +19,18 @@
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.DoubleComparator;
import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.typeutils.runtime.TuplePairComparator;
import org.apache.flink.api.java.typeutils.runtime.tuple.base.TuplePairComparatorTestBase;
public class TuplePairComparatorTest extends TuplePairComparatorTestBase<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> {
public class GenericPairComparatorTest extends TuplePairComparatorTestBase<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> {
@SuppressWarnings("unchecked")
private Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
......@@ -56,19 +58,28 @@ public class TuplePairComparatorTest extends TuplePairComparatorTestBase<Tuple3<
@SuppressWarnings("unchecked")
@Override
protected TuplePairComparator<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> createComparator(boolean ascending) {
return new TuplePairComparator<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>>(
new int[]{0, 2},
new int[]{0, 3},
new TypeComparator[]{
new IntComparator(ascending),
new DoubleComparator(ascending)
},
new TypeComparator[]{
new IntComparator(ascending),
new DoubleComparator(ascending)
}
);
protected GenericPairComparator<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> createComparator(boolean ascending) {
int[] fields1 = new int[]{0, 2};
int[] fields2 = new int[]{0, 3};
TypeComparator[] comps1 = new TypeComparator[]{
new IntComparator(ascending),
new DoubleComparator(ascending)
};
TypeComparator[] comps2 = new TypeComparator[]{
new IntComparator(ascending),
new DoubleComparator(ascending)
};
TypeSerializer[] sers1 = new TypeSerializer[]{
IntSerializer.INSTANCE,
DoubleSerializer.INSTANCE
};
TypeSerializer[] sers2= new TypeSerializer[]{
IntSerializer.INSTANCE,
DoubleSerializer.INSTANCE
};
TypeComparator<Tuple3<Integer, String, Double>> comp1 = new TupleComparator<Tuple3<Integer, String, Double>>(fields1, comps1, sers1);
TypeComparator<Tuple4<Integer, Float, Long, Double>> comp2 = new TupleComparator<Tuple4<Integer, Float, Long, Double>>(fields2, comps2, sers2);
return new GenericPairComparator<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>>(comp1, comp2);
}
@Override
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import static org.junit.Assert.assertEquals;
import org.apache.flink.api.common.typeutils.ComparatorTestBase;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.runtime.TupleLeadingFieldComparator;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
public class TupleLeadingFieldComparatorTest extends ComparatorTestBase<Tuple3<Integer, String, Double>> {
@SuppressWarnings("unchecked")
Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
new Tuple3<Integer, String, Double>(4, "hello", 20.0),
new Tuple3<Integer, String, Double>(5, "hello", 23.2),
new Tuple3<Integer, String, Double>(6, "world", 20.0),
new Tuple3<Integer, String, Double>(7, "hello", 20.0),
new Tuple3<Integer, String, Double>(8, "hello", 23.2),
new Tuple3<Integer, String, Double>(9, "world", 20.0),
new Tuple3<Integer, String, Double>(10, "hello", 20.0),
new Tuple3<Integer, String, Double>(11, "hello", 23.2)
};
@Override
protected TupleLeadingFieldComparator<Tuple3<Integer, String, Double>, Integer> createComparator(boolean ascending) {
return new TupleLeadingFieldComparator<Tuple3<Integer,String,Double>, Integer>(new IntComparator(ascending));
}
@SuppressWarnings("unchecked")
@Override
protected TupleSerializer<Tuple3<Integer, String, Double>> createSerializer() {
return new TupleSerializer<Tuple3<Integer, String, Double>>(
(Class<Tuple3<Integer, String, Double>>) (Class<?>) Tuple3.class,
new TypeSerializer[]{
new IntSerializer(),
new StringSerializer(),
new DoubleSerializer()});
}
@Override
protected Tuple3<Integer, String, Double>[] getSortedTestData() {
return dataISD;
}
@Override
protected void deepEquals(String message, Tuple3<Integer, String, Double> should, Tuple3<Integer, String, Double> is) {
for (int x = 0; x < should.getArity(); x++) {
assertEquals(should.getField(x), is.getField(x));
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.typeutils.runtime.TupleLeadingFieldPairComparator;
import org.apache.flink.api.java.typeutils.runtime.tuple.base.TuplePairComparatorTestBase;
public class TupleLeadingFieldPairComparatorTest extends TuplePairComparatorTestBase<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> {
@SuppressWarnings("unchecked")
private Tuple3<Integer, String, Double>[] dataISD = new Tuple3[]{
new Tuple3<Integer, String, Double>(4, "hello", 20.0),
new Tuple3<Integer, String, Double>(5, "world", 23.2),
new Tuple3<Integer, String, Double>(6, "hello", 18.0),
new Tuple3<Integer, String, Double>(7, "world", 19.2),
new Tuple3<Integer, String, Double>(8, "hello", 16.0),
new Tuple3<Integer, String, Double>(9, "world", 17.2),
new Tuple3<Integer, String, Double>(10, "hello", 14.0),
new Tuple3<Integer, String, Double>(11, "world", 15.2)
};
@SuppressWarnings("unchecked")
private Tuple4<Integer, Float, Long, Double>[] dataIDL = new Tuple4[]{
new Tuple4<Integer, Float, Long, Double>(4, 0.11f, 14L, 20.0),
new Tuple4<Integer, Float, Long, Double>(5, 0.221f, 15L, 23.2),
new Tuple4<Integer, Float, Long, Double>(6, 0.33f, 15L, 18.0),
new Tuple4<Integer, Float, Long, Double>(7, 0.44f, 20L, 19.2),
new Tuple4<Integer, Float, Long, Double>(8, 0.55f, 20L, 16.0),
new Tuple4<Integer, Float, Long, Double>(9, 0.66f, 29L, 17.2),
new Tuple4<Integer, Float, Long, Double>(10, 0.77f, 29L, 14.0),
new Tuple4<Integer, Float, Long, Double>(11, 0.88f, 34L, 15.2)
};
@Override
protected TypePairComparator<Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>> createComparator(boolean ascending) {
return new TupleLeadingFieldPairComparator<Integer, Tuple3<Integer, String, Double>, Tuple4<Integer, Float, Long, Double>>(
new IntComparator(ascending), new IntComparator(ascending));
}
@Override
protected Tuple2<Tuple3<Integer, String, Double>[], Tuple4<Integer, Float, Long, Double>[]> getSortedTestData() {
return new Tuple2<Tuple3<Integer, String, Double>[], Tuple4<Integer, Float, Long, Double>[]>(dataISD, dataIDL);
}
}
......@@ -316,7 +316,7 @@ public final class NormalizedKeySorter<T> implements InMemorySorter<T>
this.recordBufferForComparison.setReadPosition(pointer2);
try {
return this.comparator.compare(this.recordBuffer, this.recordBufferForComparison);
return this.comparator.compareSerialized(this.recordBuffer, this.recordBufferForComparison);
} catch (IOException ioex) {
throw new RuntimeException("Error comparing two records.", ioex);
}
......
......@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.testutils.types;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
......@@ -31,6 +32,10 @@ public class IntListComparator extends TypeComparator<IntList> {
private int reference;
private Comparable[] extractedKey = new Comparable[1];
private final TypeComparator[] comparators = new TypeComparator[] {new IntComparator(true)};
@Override
public int hash(IntList record) {
return record.getKey() * 73;
......@@ -58,7 +63,7 @@ public class IntListComparator extends TypeComparator<IntList> {
}
@Override
public int compare(DataInputView source1, DataInputView source2) throws IOException {
public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
return source1.readInt() - source2.readInt();
}
......@@ -134,4 +139,15 @@ public class IntListComparator extends TypeComparator<IntList> {
public TypeComparator<IntList> duplicate() {
return new IntListComparator();
}
@Override
public Object[] extractKeys(IntList record) {
extractedKey[0] = record.getKey();
return extractedKey;
}
@Override public TypeComparator[] getComparators() {
return comparators;
}
}
......@@ -22,6 +22,7 @@ package org.apache.flink.runtime.operators.testutils.types;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
......@@ -32,7 +33,11 @@ public class IntPairComparator extends TypeComparator<IntPair> {
private static final long serialVersionUID = 1L;
private int reference;
private final Comparable[] extractedKey = new Comparable[1];
private final TypeComparator[] comparators = new TypeComparator[] {new IntComparator(true)};
@Override
public int hash(IntPair object) {
return object.getKey() * 73;
......@@ -60,7 +65,7 @@ public class IntPairComparator extends TypeComparator<IntPair> {
}
@Override
public int compare(DataInputView source1, DataInputView source2) throws IOException {
public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
return source1.readInt() - source2.readInt();
}
......@@ -112,6 +117,15 @@ public class IntPairComparator extends TypeComparator<IntPair> {
return new IntPairComparator();
}
@Override
public Object[] extractKeys(IntPair pair) {
extractedKey[0] = pair.getKey();
return extractedKey;
}
@Override public TypeComparator[] getComparators() {
return comparators;
}
@Override
public boolean supportsSerializationWithKeyNormalization() {
return true;
......
......@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.testutils.types;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.base.StringComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
......@@ -32,6 +33,10 @@ public class StringPairComparator extends TypeComparator<StringPair> {
private String reference;
private Comparable[] extractedKey = new Comparable[1];
private final TypeComparator[] comparators = new TypeComparator[] {new StringComparator(true)};
@Override
public int hash(StringPair record) {
return record.getKey().hashCode();
......@@ -58,7 +63,7 @@ public class StringPairComparator extends TypeComparator<StringPair> {
}
@Override
public int compare(DataInputView firstSource, DataInputView secondSource)
public int compareSerialized(DataInputView firstSource, DataInputView secondSource)
throws IOException {
return StringValue.readString(firstSource).compareTo(StringValue.readString(secondSource));
}
......@@ -110,4 +115,14 @@ public class StringPairComparator extends TypeComparator<StringPair> {
public TypeComparator<StringPair> duplicate() {
return new StringPairComparator();
}
@Override
public Object[] extractKeys(StringPair record) {
extractedKey[0] = record.getKey();
return extractedKey;
}
@Override public TypeComparator[] getComparators() {
return comparators;
}
}
......@@ -27,6 +27,7 @@ import java.io.PipedOutputStream;
import junit.framework.TestCase;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.junit.Assert;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
......@@ -381,6 +382,9 @@ public class OutputEmitterTest extends TestCase {
@SuppressWarnings("serial")
private static class TestIntComparator extends TypeComparator<Integer> {
private final Comparable[] extractedKey = new Comparable[1];
private TypeComparator[] comparators = new TypeComparator[]{new IntComparator(true)};
@Override
public int hash(Integer record) {
......@@ -402,7 +406,7 @@ public class OutputEmitterTest extends TestCase {
public int compare(Integer first, Integer second) { throw new UnsupportedOperationException(); }
@Override
public int compare(DataInputView firstSource, DataInputView secondSource) {
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) {
throw new UnsupportedOperationException();
}
......@@ -438,7 +442,17 @@ public class OutputEmitterTest extends TestCase {
@Override
public TypeComparator<Integer> duplicate() { throw new UnsupportedOperationException(); }
@Override
public Object[] extractKeys(Integer record) {
extractedKey[0] = record;
return extractedKey;
}
@Override
public TypeComparator[] getComparators() {
return comparators;
}
}
// @Test
......
......@@ -21,6 +21,7 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.base.LongComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
......@@ -30,7 +31,11 @@ public final class VertexWithAdjacencyListComparator extends TypeComparator<Vert
private static final long serialVersionUID = 1L;
private long reference;
private Comparable[] extractedKey = new Comparable[1];
private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
@Override
public int hash(VertexWithAdjacencyList record) {
final long value = record.getVertexID();
......@@ -61,7 +66,7 @@ public final class VertexWithAdjacencyListComparator extends TypeComparator<Vert
}
@Override
public int compare(DataInputView source1, DataInputView source2) throws IOException {
public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
final long diff = source1.readLong() - source2.readLong();
return diff < 0 ? -1 : diff > 0 ? 1 : 0;
}
......@@ -129,4 +134,15 @@ public final class VertexWithAdjacencyListComparator extends TypeComparator<Vert
public VertexWithAdjacencyListComparator duplicate() {
return new VertexWithAdjacencyListComparator();
}
@Override
public Object[] extractKeys(VertexWithAdjacencyList record) {
extractedKey[0] = record.getVertexID();
return extractedKey;
}
@Override
public TypeComparator[] getComparators() {
return comparators;
}
}
......@@ -21,6 +21,7 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.base.LongComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
......@@ -30,7 +31,11 @@ public final class VertexWithRankAndDanglingComparator extends TypeComparator<Ve
private static final long serialVersionUID = 1L;
private long reference;
private Comparable[] extractedKey = new Comparable[1];
private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
@Override
public int hash(VertexWithRankAndDangling record) {
final long value = record.getVertexID();
......@@ -61,7 +66,7 @@ public final class VertexWithRankAndDanglingComparator extends TypeComparator<Ve
}
@Override
public int compare(DataInputView source1, DataInputView source2) throws IOException {
public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
final long diff = source1.readLong() - source2.readLong();
return diff < 0 ? -1 : diff > 0 ? 1 : 0;
}
......@@ -134,4 +139,15 @@ public final class VertexWithRankAndDanglingComparator extends TypeComparator<Ve
public VertexWithRankAndDanglingComparator duplicate() {
return new VertexWithRankAndDanglingComparator();
}
@Override
public Object[] extractKeys(VertexWithRankAndDangling record) {
extractedKey[0] = record.getVertexID();
return extractedKey;
}
@Override
public TypeComparator[] getComparators() {
return comparators;
}
}
......@@ -21,6 +21,7 @@ package org.apache.flink.test.iterative.nephele.customdanglingpagerank.types;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.base.LongComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
......@@ -30,7 +31,11 @@ public final class VertexWithRankComparator extends TypeComparator<VertexWithRan
private static final long serialVersionUID = 1L;
private long reference;
private Comparable[] extractedKey = new Comparable[1];
private TypeComparator[] comparators = new TypeComparator[]{new LongComparator(true)};
@Override
public int hash(VertexWithRank record) {
final long value = record.getVertexID();
......@@ -61,7 +66,7 @@ public final class VertexWithRankComparator extends TypeComparator<VertexWithRan
}
@Override
public int compare(DataInputView source1, DataInputView source2) throws IOException {
public int compareSerialized(DataInputView source1, DataInputView source2) throws IOException {
final long diff = source1.readLong() - source2.readLong();
return diff < 0 ? -1 : diff > 0 ? 1 : 0;
}
......@@ -132,4 +137,15 @@ public final class VertexWithRankComparator extends TypeComparator<VertexWithRan
public VertexWithRankComparator duplicate() {
return new VertexWithRankComparator();
}
@Override
public Object[] extractKeys(VertexWithRank record) {
extractedKey[0] = record.getVertexID();
return extractedKey;
}
@Override
public TypeComparator[] getComparators() {
return comparators;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册