diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparator.java new file mode 100644 index 0000000000000000000000000000000000000000..0a20d1dcfeea48756e2ff97713dc34778a166113 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparator.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.BooleanValue; +import org.apache.flink.types.NormalizableKey; + +import java.io.IOException; + +/** + * Specialized comparator for BooleanValue based on CopyableValueComparator. + */ +@Internal +public class BooleanValueComparator extends TypeComparator { + + private static final long serialVersionUID = 1L; + + private final boolean ascendingComparison; + + private final BooleanValue reference = new BooleanValue(); + + private final BooleanValue tempReference = new BooleanValue(); + + private final TypeComparator[] comparators = new TypeComparator[] {this}; + + public BooleanValueComparator(boolean ascending) { + this.ascendingComparison = ascending; + } + + @Override + public int hash(BooleanValue record) { + return record.hashCode(); + } + + @Override + public void setReference(BooleanValue toCompare) { + toCompare.copyTo(reference); + } + + @Override + public boolean equalToReference(BooleanValue candidate) { + return candidate.equals(this.reference); + } + + @Override + public int compareToReference(TypeComparator referencedComparator) { + BooleanValue otherRef = ((BooleanValueComparator) referencedComparator).reference; + int comp = otherRef.compareTo(reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compare(BooleanValue first, BooleanValue second) { + int comp = first.compareTo(second); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + reference.read(firstSource); + tempReference.read(secondSource); + int comp = reference.compareTo(tempReference); + return ascendingComparison ? comp : -comp; + } + + @Override + public boolean supportsNormalizedKey() { + return NormalizableKey.class.isAssignableFrom(BooleanValue.class); + } + + @Override + public int getNormalizeKeyLen() { + return reference.getMaxNormalizedKeyLen(); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(BooleanValue record, MemorySegment target, int offset, int numBytes) { + record.copyNormalizedKey(target, offset, numBytes); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public TypeComparator duplicate() { + return new BooleanValueComparator(ascendingComparison); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + // -------------------------------------------------------------------------------------------- + // unsupported normalization + // -------------------------------------------------------------------------------------------- + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(BooleanValue record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public BooleanValue readWithKeyDenormalization(BooleanValue reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueComparator.java new file mode 100644 index 0000000000000000000000000000000000000000..a9a067a3b747b7850bfb6974a1c52b485835a9a3 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueComparator.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.ByteValue; +import org.apache.flink.types.NormalizableKey; + +import java.io.IOException; + +/** + * Specialized comparator for ByteValue based on CopyableValueComparator. + */ +@Internal +public class ByteValueComparator extends TypeComparator { + + private static final long serialVersionUID = 1L; + + private final boolean ascendingComparison; + + private final ByteValue reference = new ByteValue(); + + private final ByteValue tempReference = new ByteValue(); + + private final TypeComparator[] comparators = new TypeComparator[] {this}; + + public ByteValueComparator(boolean ascending) { + this.ascendingComparison = ascending; + } + + @Override + public int hash(ByteValue record) { + return record.hashCode(); + } + + @Override + public void setReference(ByteValue toCompare) { + toCompare.copyTo(reference); + } + + @Override + public boolean equalToReference(ByteValue candidate) { + return candidate.equals(this.reference); + } + + @Override + public int compareToReference(TypeComparator referencedComparator) { + ByteValue otherRef = ((ByteValueComparator) referencedComparator).reference; + int comp = otherRef.compareTo(reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compare(ByteValue first, ByteValue second) { + int comp = first.compareTo(second); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + reference.read(firstSource); + tempReference.read(secondSource); + int comp = reference.compareTo(tempReference); + return ascendingComparison ? comp : -comp; + } + + @Override + public boolean supportsNormalizedKey() { + return NormalizableKey.class.isAssignableFrom(ByteValue.class); + } + + @Override + public int getNormalizeKeyLen() { + return reference.getMaxNormalizedKeyLen(); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(ByteValue record, MemorySegment target, int offset, int numBytes) { + record.copyNormalizedKey(target, offset, numBytes); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public TypeComparator duplicate() { + return new ByteValueComparator(ascendingComparison); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + // -------------------------------------------------------------------------------------------- + // unsupported normalization + // -------------------------------------------------------------------------------------------- + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(ByteValue record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ByteValue readWithKeyDenormalization(ByteValue reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueComparator.java new file mode 100644 index 0000000000000000000000000000000000000000..892438f49e3fa3480f5597135d749deaea45f244 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueComparator.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.CharValue; +import org.apache.flink.types.NormalizableKey; + +import java.io.IOException; + +/** + * Specialized comparator for CharValue based on CopyableValueComparator. + */ +@Internal +public class CharValueComparator extends TypeComparator { + + private static final long serialVersionUID = 1L; + + private final boolean ascendingComparison; + + private final CharValue reference = new CharValue(); + + private final CharValue tempReference = new CharValue(); + + private final TypeComparator[] comparators = new TypeComparator[] {this}; + + public CharValueComparator(boolean ascending) { + this.ascendingComparison = ascending; + } + + @Override + public int hash(CharValue record) { + return record.hashCode(); + } + + @Override + public void setReference(CharValue toCompare) { + toCompare.copyTo(reference); + } + + @Override + public boolean equalToReference(CharValue candidate) { + return candidate.equals(this.reference); + } + + @Override + public int compareToReference(TypeComparator referencedComparator) { + CharValue otherRef = ((CharValueComparator) referencedComparator).reference; + int comp = otherRef.compareTo(reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compare(CharValue first, CharValue second) { + int comp = first.compareTo(second); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + reference.read(firstSource); + tempReference.read(secondSource); + int comp = reference.compareTo(tempReference); + return ascendingComparison ? comp : -comp; + } + + @Override + public boolean supportsNormalizedKey() { + return NormalizableKey.class.isAssignableFrom(CharValue.class); + } + + @Override + public int getNormalizeKeyLen() { + return reference.getMaxNormalizedKeyLen(); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(CharValue record, MemorySegment target, int offset, int numBytes) { + record.copyNormalizedKey(target, offset, numBytes); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public TypeComparator duplicate() { + return new CharValueComparator(ascendingComparison); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + // -------------------------------------------------------------------------------------------- + // unsupported normalization + // -------------------------------------------------------------------------------------------- + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(CharValue record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public CharValue readWithKeyDenormalization(CharValue reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparator.java new file mode 100644 index 0000000000000000000000000000000000000000..5e566f457464fa90c2a5627551d722ff3d24013b --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparator.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.DoubleValue; +import org.apache.flink.types.NormalizableKey; + +import java.io.IOException; + +/** + * Specialized comparator for DoubleValue based on CopyableValueComparator. + */ +@Internal +public class DoubleValueComparator extends TypeComparator { + + private static final long serialVersionUID = 1L; + + private final boolean ascendingComparison; + + private final DoubleValue reference = new DoubleValue(); + + private final DoubleValue tempReference = new DoubleValue(); + + private final TypeComparator[] comparators = new TypeComparator[] {this}; + + public DoubleValueComparator(boolean ascending) { + this.ascendingComparison = ascending; + } + + @Override + public int hash(DoubleValue record) { + return record.hashCode(); + } + + @Override + public void setReference(DoubleValue toCompare) { + toCompare.copyTo(reference); + } + + @Override + public boolean equalToReference(DoubleValue candidate) { + return candidate.equals(this.reference); + } + + @Override + public int compareToReference(TypeComparator referencedComparator) { + DoubleValue otherRef = ((DoubleValueComparator) referencedComparator).reference; + int comp = otherRef.compareTo(reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compare(DoubleValue first, DoubleValue second) { + int comp = first.compareTo(second); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + reference.read(firstSource); + tempReference.read(secondSource); + int comp = reference.compareTo(tempReference); + return ascendingComparison ? comp : -comp; + } + + @Override + public boolean supportsNormalizedKey() { + return NormalizableKey.class.isAssignableFrom(DoubleValue.class); + } + + @Override + public int getNormalizeKeyLen() { + NormalizableKey key = (NormalizableKey) reference; + return key.getMaxNormalizedKeyLen(); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(DoubleValue record, MemorySegment target, int offset, int numBytes) { + NormalizableKey key = (NormalizableKey) record; + key.copyNormalizedKey(target, offset, numBytes); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public TypeComparator duplicate() { + return new DoubleValueComparator(ascendingComparison); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + // -------------------------------------------------------------------------------------------- + // unsupported normalization + // -------------------------------------------------------------------------------------------- + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(DoubleValue record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public DoubleValue readWithKeyDenormalization(DoubleValue reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueComparator.java new file mode 100644 index 0000000000000000000000000000000000000000..90077c67d43a4d7ad9781ddc2b0c7a7028a2e937 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueComparator.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.FloatValue; +import org.apache.flink.types.NormalizableKey; + +import java.io.IOException; + +/** + * Specialized comparator for FloatValue based on CopyableValueComparator. + */ +@Internal +public class FloatValueComparator extends TypeComparator { + + private static final long serialVersionUID = 1L; + + private final boolean ascendingComparison; + + private final FloatValue reference = new FloatValue(); + + private final FloatValue tempReference = new FloatValue(); + + private final TypeComparator[] comparators = new TypeComparator[] {this}; + + public FloatValueComparator(boolean ascending) { + this.ascendingComparison = ascending; + } + + @Override + public int hash(FloatValue record) { + return record.hashCode(); + } + + @Override + public void setReference(FloatValue toCompare) { + toCompare.copyTo(reference); + } + + @Override + public boolean equalToReference(FloatValue candidate) { + return candidate.equals(this.reference); + } + + @Override + public int compareToReference(TypeComparator referencedComparator) { + FloatValue otherRef = ((FloatValueComparator) referencedComparator).reference; + int comp = otherRef.compareTo(reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compare(FloatValue first, FloatValue second) { + int comp = first.compareTo(second); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + reference.read(firstSource); + tempReference.read(secondSource); + int comp = reference.compareTo(tempReference); + return ascendingComparison ? comp : -comp; + } + + @Override + public boolean supportsNormalizedKey() { + return NormalizableKey.class.isAssignableFrom(FloatValue.class); + } + + @Override + public int getNormalizeKeyLen() { + NormalizableKey key = (NormalizableKey) reference; + return key.getMaxNormalizedKeyLen(); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(FloatValue record, MemorySegment target, int offset, int numBytes) { + NormalizableKey key = (NormalizableKey) record; + key.copyNormalizedKey(target, offset, numBytes); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public TypeComparator duplicate() { + return new FloatValueComparator(ascendingComparison); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + // -------------------------------------------------------------------------------------------- + // unsupported normalization + // -------------------------------------------------------------------------------------------- + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(FloatValue record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public FloatValue readWithKeyDenormalization(FloatValue reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueComparator.java new file mode 100644 index 0000000000000000000000000000000000000000..e18e7dcd55ec5d5b32ff5fa8fde502701080c80c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueComparator.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.IntValue; +import org.apache.flink.types.NormalizableKey; + +import java.io.IOException; + +/** + * Specialized comparator for IntValue based on CopyableValueComparator. + */ +@Internal +public class IntValueComparator extends TypeComparator { + + private static final long serialVersionUID = 1L; + + private final boolean ascendingComparison; + + private final IntValue reference = new IntValue(); + + private final IntValue tempReference = new IntValue(); + + private final TypeComparator[] comparators = new TypeComparator[] {this}; + + public IntValueComparator(boolean ascending) { + this.ascendingComparison = ascending; + } + + @Override + public int hash(IntValue record) { + return record.hashCode(); + } + + @Override + public void setReference(IntValue toCompare) { + toCompare.copyTo(reference); + } + + @Override + public boolean equalToReference(IntValue candidate) { + return candidate.equals(this.reference); + } + + @Override + public int compareToReference(TypeComparator referencedComparator) { + IntValue otherRef = ((IntValueComparator) referencedComparator).reference; + int comp = otherRef.compareTo(reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compare(IntValue first, IntValue second) { + int comp = first.compareTo(second); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + reference.read(firstSource); + tempReference.read(secondSource); + int comp = reference.compareTo(tempReference); + return ascendingComparison ? comp : -comp; + } + + @Override + public boolean supportsNormalizedKey() { + return NormalizableKey.class.isAssignableFrom(IntValue.class); + } + + @Override + public int getNormalizeKeyLen() { + return reference.getMaxNormalizedKeyLen(); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(IntValue record, MemorySegment target, int offset, int numBytes) { + record.copyNormalizedKey(target, offset, numBytes); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public TypeComparator duplicate() { + return new IntValueComparator(ascendingComparison); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + // -------------------------------------------------------------------------------------------- + // unsupported normalization + // -------------------------------------------------------------------------------------------- + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(IntValue record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public IntValue readWithKeyDenormalization(IntValue reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueComparator.java new file mode 100644 index 0000000000000000000000000000000000000000..dfe1094a3dc3a16f7076b24e6b6f44910772a245 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueComparator.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NormalizableKey; + +import java.io.IOException; + +/** + * Specialized comparator for LongValue based on CopyableValueComparator. + */ +@Internal +public class LongValueComparator extends TypeComparator { + + private static final long serialVersionUID = 1L; + + private final boolean ascendingComparison; + + private final LongValue reference = new LongValue(); + + private final LongValue tempReference = new LongValue(); + + private final TypeComparator[] comparators = new TypeComparator[] {this}; + + public LongValueComparator(boolean ascending) { + this.ascendingComparison = ascending; + } + + @Override + public int hash(LongValue record) { + return record.hashCode(); + } + + @Override + public void setReference(LongValue toCompare) { + toCompare.copyTo(reference); + } + + @Override + public boolean equalToReference(LongValue candidate) { + return candidate.equals(this.reference); + } + + @Override + public int compareToReference(TypeComparator referencedComparator) { + LongValue otherRef = ((LongValueComparator) referencedComparator).reference; + int comp = otherRef.compareTo(reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compare(LongValue first, LongValue second) { + int comp = first.compareTo(second); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + reference.read(firstSource); + tempReference.read(secondSource); + int comp = reference.compareTo(tempReference); + return ascendingComparison ? comp : -comp; + } + + @Override + public boolean supportsNormalizedKey() { + return NormalizableKey.class.isAssignableFrom(LongValue.class); + } + + @Override + public int getNormalizeKeyLen() { + return reference.getMaxNormalizedKeyLen(); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(LongValue record, MemorySegment target, int offset, int numBytes) { + record.copyNormalizedKey(target, offset, numBytes); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public TypeComparator duplicate() { + return new LongValueComparator(ascendingComparison); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + // -------------------------------------------------------------------------------------------- + // unsupported normalization + // -------------------------------------------------------------------------------------------- + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(LongValue record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public LongValue readWithKeyDenormalization(LongValue reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueComparator.java new file mode 100644 index 0000000000000000000000000000000000000000..7212cc774002bd9366c3c86c7427f1b0250bc22c --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueComparator.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.NormalizableKey; +import org.apache.flink.types.NullValue; + +import java.io.IOException; + +/** + * Specialized comparator for NullValue based on CopyableValueComparator. + */ +@Internal +public class NullValueComparator extends TypeComparator { + + private static final long serialVersionUID = 1L; + + private final TypeComparator[] comparators = new TypeComparator[] {this}; + + private final static NullValueComparator INSTANCE = new NullValueComparator(); + + public static NullValueComparator getInstance() { + return INSTANCE; + } + + private NullValueComparator() {} + + @Override + public int hash(NullValue record) { + return record.hashCode(); + } + + @Override + public void setReference(NullValue toCompare) {} + + @Override + public boolean equalToReference(NullValue candidate) { + return true; + } + + @Override + public int compareToReference(TypeComparator referencedComparator) { + return 0; + } + + @Override + public int compare(NullValue first, NullValue second) { + return 0; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + return 0; + } + + @Override + public boolean supportsNormalizedKey() { + return NormalizableKey.class.isAssignableFrom(NullValue.class); + } + + @Override + public int getNormalizeKeyLen() { + return NullValue.getInstance().getMaxNormalizedKeyLen(); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(NullValue record, MemorySegment target, int offset, int numBytes) { + record.copyNormalizedKey(target, offset, numBytes); + } + + @Override + public boolean invertNormalizedKey() { + return false; + } + + @Override + public TypeComparator duplicate() { + return NullValueComparator.getInstance(); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + // -------------------------------------------------------------------------------------------- + // unsupported normalization + // -------------------------------------------------------------------------------------------- + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(NullValue record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public NullValue readWithKeyDenormalization(NullValue reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java new file mode 100644 index 0000000000000000000000000000000000000000..3303d574b7c7e2ba7f123c225bcb90075379bd89 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/NullValueSerializer.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.types.NullValue; + +import java.io.IOException; + +@Internal +public final class NullValueSerializer extends TypeSerializerSingleton { + + private static final long serialVersionUID = 1L; + + public static final NullValueSerializer INSTANCE = new NullValueSerializer(); + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public NullValue createInstance() { + return NullValue.getInstance(); + } + + @Override + public NullValue copy(NullValue from) { + return NullValue.getInstance(); + } + + @Override + public NullValue copy(NullValue from, NullValue reuse) { + return NullValue.getInstance(); + } + + @Override + public int getLength() { + return 0; + } + + @Override + public void serialize(NullValue record, DataOutputView target) throws IOException { + } + + @Override + public NullValue deserialize(DataInputView source) throws IOException { + return NullValue.getInstance(); + } + + @Override + public NullValue deserialize(NullValue reuse, DataInputView source) throws IOException { + return NullValue.getInstance(); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof NullValueSerializer; + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueComparator.java new file mode 100644 index 0000000000000000000000000000000000000000..a776b704e1843cc907208e09acb749e2faff49fd --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueComparator.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.NormalizableKey; +import org.apache.flink.types.ShortValue; + +import java.io.IOException; + +/** + * Specialized comparator for ShortValue based on CopyableValueComparator. + */ +@Internal +public class ShortValueComparator extends TypeComparator { + + private static final long serialVersionUID = 1L; + + private final boolean ascendingComparison; + + private final ShortValue reference = new ShortValue(); + + private final ShortValue tempReference = new ShortValue(); + + private final TypeComparator[] comparators = new TypeComparator[] {this}; + + public ShortValueComparator(boolean ascending) { + this.ascendingComparison = ascending; + } + + @Override + public int hash(ShortValue record) { + return record.hashCode(); + } + + @Override + public void setReference(ShortValue toCompare) { + toCompare.copyTo(reference); + } + + @Override + public boolean equalToReference(ShortValue candidate) { + return candidate.equals(this.reference); + } + + @Override + public int compareToReference(TypeComparator referencedComparator) { + ShortValue otherRef = ((ShortValueComparator) referencedComparator).reference; + int comp = otherRef.compareTo(reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compare(ShortValue first, ShortValue second) { + int comp = first.compareTo(second); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + reference.read(firstSource); + tempReference.read(secondSource); + int comp = reference.compareTo(tempReference); + return ascendingComparison ? comp : -comp; + } + + @Override + public boolean supportsNormalizedKey() { + return NormalizableKey.class.isAssignableFrom(ShortValue.class); + } + + @Override + public int getNormalizeKeyLen() { + return reference.getMaxNormalizedKeyLen(); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(ShortValue record, MemorySegment target, int offset, int numBytes) { + record.copyNormalizedKey(target, offset, numBytes); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public TypeComparator duplicate() { + return new ShortValueComparator(ascendingComparison); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + // -------------------------------------------------------------------------------------------- + // unsupported normalization + // -------------------------------------------------------------------------------------------- + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(ShortValue record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ShortValue readWithKeyDenormalization(ShortValue reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueComparator.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueComparator.java new file mode 100644 index 0000000000000000000000000000000000000000..c2649014d0351f1f23b659779bed4e7ad16e0617 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueComparator.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.NormalizableKey; +import org.apache.flink.types.StringValue; + +import java.io.IOException; + +/** + * Specialized comparator for StringValue based on CopyableValueComparator. + */ +@Internal +public class StringValueComparator extends TypeComparator { + + private static final long serialVersionUID = 1L; + + private final boolean ascendingComparison; + + private final StringValue reference = new StringValue(); + + private final StringValue tempReference = new StringValue(); + + private final TypeComparator[] comparators = new TypeComparator[] {this}; + + public StringValueComparator(boolean ascending) { + this.ascendingComparison = ascending; + } + + @Override + public int hash(StringValue record) { + return record.hashCode(); + } + + @Override + public void setReference(StringValue toCompare) { + toCompare.copyTo(reference); + } + + @Override + public boolean equalToReference(StringValue candidate) { + return candidate.equals(this.reference); + } + + @Override + public int compareToReference(TypeComparator referencedComparator) { + StringValue otherRef = ((StringValueComparator) referencedComparator).reference; + int comp = otherRef.compareTo(reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compare(StringValue first, StringValue second) { + int comp = first.compareTo(second); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + reference.read(firstSource); + tempReference.read(secondSource); + int comp = reference.compareTo(tempReference); + return ascendingComparison ? comp : -comp; + } + + @Override + public boolean supportsNormalizedKey() { + return NormalizableKey.class.isAssignableFrom(StringValue.class); + } + + @Override + public int getNormalizeKeyLen() { + return reference.getMaxNormalizedKeyLen(); + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return keyBytes < getNormalizeKeyLen(); + } + + @Override + public void putNormalizedKey(StringValue record, MemorySegment target, int offset, int numBytes) { + record.copyNormalizedKey(target, offset, numBytes); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public TypeComparator duplicate() { + return new StringValueComparator(ascendingComparison); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + // -------------------------------------------------------------------------------------------- + // unsupported normalization + // -------------------------------------------------------------------------------------------- + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public void writeWithKeyNormalization(StringValue record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public StringValue readWithKeyDenormalization(StringValue reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java index 495a324326dfba3ed335e804cd8d0d01453ae185..5a8334f63e5c4a30095c2768f7b0cdb225bb0b5c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java @@ -26,6 +26,26 @@ import org.apache.flink.api.common.typeinfo.AtomicType; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.BooleanValueComparator; +import org.apache.flink.api.common.typeutils.base.BooleanValueSerializer; +import org.apache.flink.api.common.typeutils.base.ByteValueComparator; +import org.apache.flink.api.common.typeutils.base.ByteValueSerializer; +import org.apache.flink.api.common.typeutils.base.CharValueComparator; +import org.apache.flink.api.common.typeutils.base.CharValueSerializer; +import org.apache.flink.api.common.typeutils.base.DoubleValueComparator; +import org.apache.flink.api.common.typeutils.base.DoubleValueSerializer; +import org.apache.flink.api.common.typeutils.base.FloatValueComparator; +import org.apache.flink.api.common.typeutils.base.FloatValueSerializer; +import org.apache.flink.api.common.typeutils.base.IntValueComparator; +import org.apache.flink.api.common.typeutils.base.IntValueSerializer; +import org.apache.flink.api.common.typeutils.base.LongValueComparator; +import org.apache.flink.api.common.typeutils.base.LongValueSerializer; +import org.apache.flink.api.common.typeutils.base.NullValueComparator; +import org.apache.flink.api.common.typeutils.base.NullValueSerializer; +import org.apache.flink.api.common.typeutils.base.ShortValueComparator; +import org.apache.flink.api.common.typeutils.base.ShortValueSerializer; +import org.apache.flink.api.common.typeutils.base.StringValueComparator; +import org.apache.flink.api.common.typeutils.base.StringValueSerializer; import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator; import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer; import org.apache.flink.api.java.typeutils.runtime.ValueComparator; @@ -126,7 +146,37 @@ public class ValueTypeInfo extends TypeInformation implement @SuppressWarnings("unchecked") @PublicEvolving public TypeSerializer createSerializer(ExecutionConfig executionConfig) { - if (CopyableValue.class.isAssignableFrom(type)) { + if (BooleanValue.class.isAssignableFrom(type)) { + return (TypeSerializer) BooleanValueSerializer.INSTANCE; + } + else if (ByteValue.class.isAssignableFrom(type)) { + return (TypeSerializer) ByteValueSerializer.INSTANCE; + } + else if (CharValue.class.isAssignableFrom(type)) { + return (TypeSerializer) CharValueSerializer.INSTANCE; + } + else if (DoubleValue.class.isAssignableFrom(type)) { + return (TypeSerializer) DoubleValueSerializer.INSTANCE; + } + else if (FloatValue.class.isAssignableFrom(type)) { + return (TypeSerializer) FloatValueSerializer.INSTANCE; + } + else if (IntValue.class.isAssignableFrom(type)) { + return (TypeSerializer) IntValueSerializer.INSTANCE; + } + else if (LongValue.class.isAssignableFrom(type)) { + return (TypeSerializer) LongValueSerializer.INSTANCE; + } + else if (NullValue.class.isAssignableFrom(type)) { + return (TypeSerializer) NullValueSerializer.INSTANCE; + } + else if (ShortValue.class.isAssignableFrom(type)) { + return (TypeSerializer) ShortValueSerializer.INSTANCE; + } + else if (StringValue.class.isAssignableFrom(type)) { + return (TypeSerializer) StringValueSerializer.INSTANCE; + } + else if (CopyableValue.class.isAssignableFrom(type)) { return (TypeSerializer) createCopyableValueSerializer(type.asSubclass(CopyableValue.class)); } else { @@ -141,8 +191,38 @@ public class ValueTypeInfo extends TypeInformation implement if (!isKeyType()) { throw new RuntimeException("The type " + type.getName() + " is not Comparable."); } - - if (CopyableValue.class.isAssignableFrom(type)) { + + if (BooleanValue.class.isAssignableFrom(type)) { + return (TypeComparator) new BooleanValueComparator(sortOrderAscending); + } + else if (ByteValue.class.isAssignableFrom(type)) { + return (TypeComparator) new ByteValueComparator(sortOrderAscending); + } + else if (CharValue.class.isAssignableFrom(type)) { + return (TypeComparator) new CharValueComparator(sortOrderAscending); + } + else if (DoubleValue.class.isAssignableFrom(type)) { + return (TypeComparator) new DoubleValueComparator(sortOrderAscending); + } + else if (FloatValue.class.isAssignableFrom(type)) { + return (TypeComparator) new FloatValueComparator(sortOrderAscending); + } + else if (IntValue.class.isAssignableFrom(type)) { + return (TypeComparator) new IntValueComparator(sortOrderAscending); + } + else if (LongValue.class.isAssignableFrom(type)) { + return (TypeComparator) new LongValueComparator(sortOrderAscending); + } + else if (NullValue.class.isAssignableFrom(type)) { + return (TypeComparator) NullValueComparator.getInstance(); + } + else if (ShortValue.class.isAssignableFrom(type)) { + return (TypeComparator) new ShortValueComparator(sortOrderAscending); + } + else if (StringValue.class.isAssignableFrom(type)) { + return (TypeComparator) new StringValueComparator(sortOrderAscending); + } + else if (CopyableValue.class.isAssignableFrom(type)) { return (TypeComparator) new CopyableValueComparator(sortOrderAscending, type); } else { diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparatorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..62ca95db13a5d23b66d76587cbf2477114af3050 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/BooleanValueComparatorTest.java @@ -0,0 +1,44 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.BooleanValue; + +public class BooleanValueComparatorTest extends ComparatorTestBase { + + @Override + protected TypeComparator createComparator(boolean ascending) { + return new BooleanValueComparator(ascending); + } + + @Override + protected TypeSerializer createSerializer() { + return new BooleanValueSerializer(); + } + + @Override + protected BooleanValue[] getSortedTestData() { + return new BooleanValue[]{BooleanValue.FALSE, BooleanValue.TRUE}; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueComparatorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..f14e3d25570be42fd1a8e7e665e63821eac2de2f --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ByteValueComparatorTest.java @@ -0,0 +1,66 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.ByteValue; + +import java.util.Random; + +public class ByteValueComparatorTest extends ComparatorTestBase { + + @Override + protected TypeComparator createComparator(boolean ascending) { + return new ByteValueComparator(ascending); + } + + @Override + protected TypeSerializer createSerializer() { + return new ByteValueSerializer(); + } + + @Override + protected ByteValue[] getSortedTestData() { + + Random rnd = new Random(874597969123412338L); + int rndByte = rnd.nextInt(Byte.MAX_VALUE); + if (rndByte < 0) { + rndByte = -rndByte; + } + if (rndByte == Byte.MAX_VALUE) { + rndByte -= 3; + } + if (rndByte <= 2) { + rndByte += 3; + } + return new ByteValue[]{ + new ByteValue(Byte.MIN_VALUE), + new ByteValue(Integer.valueOf(-rndByte).byteValue()), + new ByteValue(Integer.valueOf(-1).byteValue()), + new ByteValue(Integer.valueOf(0).byteValue()), + new ByteValue(Integer.valueOf(1).byteValue()), + new ByteValue(Integer.valueOf(2).byteValue()), + new ByteValue(Integer.valueOf(rndByte).byteValue()), + new ByteValue(Byte.MAX_VALUE)}; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueComparatorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..f952fc46597b96002af8826a627f606c320beeed --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/CharValueComparatorTest.java @@ -0,0 +1,62 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.CharValue; + +import java.util.Random; + +public class CharValueComparatorTest extends ComparatorTestBase { + + @Override + protected TypeComparator createComparator(boolean ascending) { + return new CharValueComparator(ascending); + } + + @Override + protected TypeSerializer createSerializer() { + return new CharValueSerializer(); + } + + @Override + protected CharValue[] getSortedTestData() { + Random rnd = new Random(874597969123412338L); + int rndChar = rnd.nextInt(Character.MAX_VALUE); + if(rndChar<0){ + rndChar=-rndChar; + } + if(rndChar==(int)Character.MIN_VALUE){ + rndChar+=2; + } + if(rndChar==(int)Character.MAX_VALUE){ + rndChar-=2; + } + return new CharValue[]{ + new CharValue(Character.MIN_VALUE), + new CharValue((char)rndChar), + new CharValue((char)(rndChar+1)), + new CharValue(Character.MAX_VALUE) + }; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparatorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..b50cecab0e00f4352ff7b16c58952b76fa4e4855 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/DoubleValueComparatorTest.java @@ -0,0 +1,63 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.DoubleValue; + +import java.util.Random; + +public class DoubleValueComparatorTest extends ComparatorTestBase { + + @Override + protected TypeComparator createComparator(boolean ascending) { + return new DoubleValueComparator(ascending); + } + + @Override + protected TypeSerializer createSerializer() { + return new DoubleValueSerializer(); + } + + @Override + protected DoubleValue[] getSortedTestData() { + Random rnd = new Random(874597969123412338L); + double rndDouble = rnd.nextDouble(); + if (rndDouble < 0) { + rndDouble = -rndDouble; + } + if (rndDouble == Double.MAX_VALUE) { + rndDouble -= 3; + } + if (rndDouble <= 2) { + rndDouble += 3; + } + return new DoubleValue[]{ + new DoubleValue(-rndDouble), + new DoubleValue(-1.0D), + new DoubleValue(0.0D), + new DoubleValue(2.0D), + new DoubleValue(rndDouble), + new DoubleValue(Double.MAX_VALUE)}; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueComparatorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..67c5ba90e80c0299e046d55940a7694696e3e74b --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/FloatValueComparatorTest.java @@ -0,0 +1,63 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.FloatValue; + +import java.util.Random; + +public class FloatValueComparatorTest extends ComparatorTestBase { + + @Override + protected TypeComparator createComparator(boolean ascending) { + return new FloatValueComparator(ascending); + } + + @Override + protected TypeSerializer createSerializer() { + return new FloatValueSerializer(); + } + + @Override + protected FloatValue[] getSortedTestData() { + Random rnd = new Random(874597969123412338L); + float rndFloat = rnd.nextFloat(); + if (rndFloat < 0) { + rndFloat = -rndFloat; + } + if (rndFloat == Float.MAX_VALUE) { + rndFloat -= 3; + } + if (rndFloat <= 2) { + rndFloat += 3; + } + return new FloatValue[]{ + new FloatValue(-rndFloat), + new FloatValue(-1.0F), + new FloatValue(0.0F), + new FloatValue(2.0F), + new FloatValue(rndFloat), + new FloatValue(Float.MAX_VALUE)}; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueComparatorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..037eb56d6fd70fe9872afcfe046273be26766bcd --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/IntValueComparatorTest.java @@ -0,0 +1,66 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.IntValue; + +import java.util.Random; + +public class IntValueComparatorTest extends ComparatorTestBase { + + @Override + protected TypeComparator createComparator(boolean ascending) { + return new IntValueComparator(ascending); + } + + @Override + protected TypeSerializer createSerializer() { + return new IntValueSerializer(); + } + + @Override + protected IntValue[] getSortedTestData() { + + Random rnd = new Random(874597969123412338L); + int rndInt = rnd.nextInt(); + if (rndInt < 0) { + rndInt = -rndInt; + } + if (rndInt == Integer.MAX_VALUE) { + rndInt -= 3; + } + if (rndInt <= 2) { + rndInt += 3; + } + return new IntValue[]{ + new IntValue(Integer.MIN_VALUE), + new IntValue(-rndInt), + new IntValue(-1), + new IntValue(0), + new IntValue(1), + new IntValue(2), + new IntValue(rndInt), + new IntValue(Integer.MAX_VALUE)}; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueComparatorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..6871a6ba6927aa961f7eb1bf4f1594ba941857a5 --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/LongValueComparatorTest.java @@ -0,0 +1,65 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.LongValue; + +import java.util.Random; + +public class LongValueComparatorTest extends ComparatorTestBase { + + @Override + protected TypeComparator createComparator(boolean ascending) { + return new LongValueComparator(ascending); + } + + @Override + protected TypeSerializer createSerializer() { + return new LongValueSerializer(); + } + + @Override + protected LongValue[] getSortedTestData() { + Random rnd = new Random(874597969123412338L); + long rndLong = rnd.nextLong(); + if (rndLong < 0) { + rndLong = -rndLong; + } + if (rndLong == Long.MAX_VALUE) { + rndLong -= 3; + } + if (rndLong <= 2) { + rndLong += 3; + } + return new LongValue[]{ + new LongValue(Long.MIN_VALUE), + new LongValue(-rndLong), + new LongValue(-1L), + new LongValue(0L), + new LongValue(1L), + new LongValue(2L), + new LongValue(rndLong), + new LongValue(Long.MAX_VALUE)}; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueComparatorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..7d4a3a0ec6436bfe060fa16e52c40e01080ab2fd --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/ShortValueComparatorTest.java @@ -0,0 +1,65 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.ShortValue; + +import java.util.Random; + +public class ShortValueComparatorTest extends ComparatorTestBase { + + @Override + protected TypeComparator createComparator(boolean ascending) { + return new ShortValueComparator(ascending); + } + + @Override + protected TypeSerializer createSerializer() { + return new ShortValueSerializer(); + } + + @Override + protected ShortValue[] getSortedTestData() { + Random rnd = new Random(874597969123412338L); + short rndShort = Integer.valueOf(rnd.nextInt()).shortValue(); + if (rndShort < 0) { + rndShort = Integer.valueOf(-rndShort).shortValue(); + } + if (rndShort == Short.MAX_VALUE) { + rndShort -= 3; + } + if (rndShort <= 2) { + rndShort += 3; + } + return new ShortValue[]{ + new ShortValue(Short.MIN_VALUE), + new ShortValue(Integer.valueOf(-rndShort).shortValue()), + new ShortValue(Integer.valueOf(-1).shortValue()), + new ShortValue(Integer.valueOf(0).shortValue()), + new ShortValue(Integer.valueOf(1).shortValue()), + new ShortValue(Integer.valueOf(2).shortValue()), + new ShortValue(Integer.valueOf(rndShort).shortValue()), + new ShortValue(Short.MAX_VALUE)}; + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueComparatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueComparatorTest.java new file mode 100644 index 0000000000000000000000000000000000000000..e0b2f703a48cb5fc2b39e53b65c5a0cab0335b6f --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/base/StringValueComparatorTest.java @@ -0,0 +1,53 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.typeutils.ComparatorTestBase; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.types.StringValue; + +public class StringValueComparatorTest extends ComparatorTestBase { + + @Override + protected TypeComparator createComparator(boolean ascending) { + return new StringValueComparator(ascending); + } + + @Override + protected TypeSerializer createSerializer() { + return new StringValueSerializer(); + } + + @Override + protected StringValue[] getSortedTestData() { + return new StringValue[]{ + new StringValue(""), + new StringValue("Lorem Ipsum Dolor Omit Longer"), + new StringValue("aaaa"), + new StringValue("abcd"), + new StringValue("abce"), + new StringValue("abdd"), + new StringValue("accd"), + new StringValue("bbcd") + }; + } +}