提交 08b075aa 编写于 作者: G Greg Hogan

[FLINK-3868] [core] Specialized CopyableValue serializers and comparators

Update ValueTypeInfo to use specialized serializers and comparators,
many of which were already present.

This closes #1983
上级 7ec6d7b5
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.types.BooleanValue;
import org.apache.flink.types.NormalizableKey;
import java.io.IOException;
/**
* Specialized comparator for BooleanValue based on CopyableValueComparator.
*/
@Internal
public class BooleanValueComparator extends TypeComparator<BooleanValue> {
private static final long serialVersionUID = 1L;
private final boolean ascendingComparison;
private final BooleanValue reference = new BooleanValue();
private final BooleanValue tempReference = new BooleanValue();
private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
public BooleanValueComparator(boolean ascending) {
this.ascendingComparison = ascending;
}
@Override
public int hash(BooleanValue record) {
return record.hashCode();
}
@Override
public void setReference(BooleanValue toCompare) {
toCompare.copyTo(reference);
}
@Override
public boolean equalToReference(BooleanValue candidate) {
return candidate.equals(this.reference);
}
@Override
public int compareToReference(TypeComparator<BooleanValue> referencedComparator) {
BooleanValue otherRef = ((BooleanValueComparator) referencedComparator).reference;
int comp = otherRef.compareTo(reference);
return ascendingComparison ? comp : -comp;
}
@Override
public int compare(BooleanValue first, BooleanValue second) {
int comp = first.compareTo(second);
return ascendingComparison ? comp : -comp;
}
@Override
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
reference.read(firstSource);
tempReference.read(secondSource);
int comp = reference.compareTo(tempReference);
return ascendingComparison ? comp : -comp;
}
@Override
public boolean supportsNormalizedKey() {
return NormalizableKey.class.isAssignableFrom(BooleanValue.class);
}
@Override
public int getNormalizeKeyLen() {
return reference.getMaxNormalizedKeyLen();
}
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
return keyBytes < getNormalizeKeyLen();
}
@Override
public void putNormalizedKey(BooleanValue record, MemorySegment target, int offset, int numBytes) {
record.copyNormalizedKey(target, offset, numBytes);
}
@Override
public boolean invertNormalizedKey() {
return !ascendingComparison;
}
@Override
public TypeComparator<BooleanValue> duplicate() {
return new BooleanValueComparator(ascendingComparison);
}
@Override
public int extractKeys(Object record, Object[] target, int index) {
target[index] = record;
return 1;
}
@Override
public TypeComparator<?>[] getFlatComparators() {
return comparators;
}
// --------------------------------------------------------------------------------------------
// unsupported normalization
// --------------------------------------------------------------------------------------------
@Override
public boolean supportsSerializationWithKeyNormalization() {
return false;
}
@Override
public void writeWithKeyNormalization(BooleanValue record, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public BooleanValue readWithKeyDenormalization(BooleanValue reuse, DataInputView source) throws IOException {
throw new UnsupportedOperationException();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.types.ByteValue;
import org.apache.flink.types.NormalizableKey;
import java.io.IOException;
/**
* Specialized comparator for ByteValue based on CopyableValueComparator.
*/
@Internal
public class ByteValueComparator extends TypeComparator<ByteValue> {
private static final long serialVersionUID = 1L;
private final boolean ascendingComparison;
private final ByteValue reference = new ByteValue();
private final ByteValue tempReference = new ByteValue();
private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
public ByteValueComparator(boolean ascending) {
this.ascendingComparison = ascending;
}
@Override
public int hash(ByteValue record) {
return record.hashCode();
}
@Override
public void setReference(ByteValue toCompare) {
toCompare.copyTo(reference);
}
@Override
public boolean equalToReference(ByteValue candidate) {
return candidate.equals(this.reference);
}
@Override
public int compareToReference(TypeComparator<ByteValue> referencedComparator) {
ByteValue otherRef = ((ByteValueComparator) referencedComparator).reference;
int comp = otherRef.compareTo(reference);
return ascendingComparison ? comp : -comp;
}
@Override
public int compare(ByteValue first, ByteValue second) {
int comp = first.compareTo(second);
return ascendingComparison ? comp : -comp;
}
@Override
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
reference.read(firstSource);
tempReference.read(secondSource);
int comp = reference.compareTo(tempReference);
return ascendingComparison ? comp : -comp;
}
@Override
public boolean supportsNormalizedKey() {
return NormalizableKey.class.isAssignableFrom(ByteValue.class);
}
@Override
public int getNormalizeKeyLen() {
return reference.getMaxNormalizedKeyLen();
}
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
return keyBytes < getNormalizeKeyLen();
}
@Override
public void putNormalizedKey(ByteValue record, MemorySegment target, int offset, int numBytes) {
record.copyNormalizedKey(target, offset, numBytes);
}
@Override
public boolean invertNormalizedKey() {
return !ascendingComparison;
}
@Override
public TypeComparator<ByteValue> duplicate() {
return new ByteValueComparator(ascendingComparison);
}
@Override
public int extractKeys(Object record, Object[] target, int index) {
target[index] = record;
return 1;
}
@Override
public TypeComparator<?>[] getFlatComparators() {
return comparators;
}
// --------------------------------------------------------------------------------------------
// unsupported normalization
// --------------------------------------------------------------------------------------------
@Override
public boolean supportsSerializationWithKeyNormalization() {
return false;
}
@Override
public void writeWithKeyNormalization(ByteValue record, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public ByteValue readWithKeyDenormalization(ByteValue reuse, DataInputView source) throws IOException {
throw new UnsupportedOperationException();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.types.CharValue;
import org.apache.flink.types.NormalizableKey;
import java.io.IOException;
/**
* Specialized comparator for CharValue based on CopyableValueComparator.
*/
@Internal
public class CharValueComparator extends TypeComparator<CharValue> {
private static final long serialVersionUID = 1L;
private final boolean ascendingComparison;
private final CharValue reference = new CharValue();
private final CharValue tempReference = new CharValue();
private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
public CharValueComparator(boolean ascending) {
this.ascendingComparison = ascending;
}
@Override
public int hash(CharValue record) {
return record.hashCode();
}
@Override
public void setReference(CharValue toCompare) {
toCompare.copyTo(reference);
}
@Override
public boolean equalToReference(CharValue candidate) {
return candidate.equals(this.reference);
}
@Override
public int compareToReference(TypeComparator<CharValue> referencedComparator) {
CharValue otherRef = ((CharValueComparator) referencedComparator).reference;
int comp = otherRef.compareTo(reference);
return ascendingComparison ? comp : -comp;
}
@Override
public int compare(CharValue first, CharValue second) {
int comp = first.compareTo(second);
return ascendingComparison ? comp : -comp;
}
@Override
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
reference.read(firstSource);
tempReference.read(secondSource);
int comp = reference.compareTo(tempReference);
return ascendingComparison ? comp : -comp;
}
@Override
public boolean supportsNormalizedKey() {
return NormalizableKey.class.isAssignableFrom(CharValue.class);
}
@Override
public int getNormalizeKeyLen() {
return reference.getMaxNormalizedKeyLen();
}
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
return keyBytes < getNormalizeKeyLen();
}
@Override
public void putNormalizedKey(CharValue record, MemorySegment target, int offset, int numBytes) {
record.copyNormalizedKey(target, offset, numBytes);
}
@Override
public boolean invertNormalizedKey() {
return !ascendingComparison;
}
@Override
public TypeComparator<CharValue> duplicate() {
return new CharValueComparator(ascendingComparison);
}
@Override
public int extractKeys(Object record, Object[] target, int index) {
target[index] = record;
return 1;
}
@Override
public TypeComparator<?>[] getFlatComparators() {
return comparators;
}
// --------------------------------------------------------------------------------------------
// unsupported normalization
// --------------------------------------------------------------------------------------------
@Override
public boolean supportsSerializationWithKeyNormalization() {
return false;
}
@Override
public void writeWithKeyNormalization(CharValue record, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public CharValue readWithKeyDenormalization(CharValue reuse, DataInputView source) throws IOException {
throw new UnsupportedOperationException();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.types.DoubleValue;
import org.apache.flink.types.NormalizableKey;
import java.io.IOException;
/**
* Specialized comparator for DoubleValue based on CopyableValueComparator.
*/
@Internal
public class DoubleValueComparator extends TypeComparator<DoubleValue> {
private static final long serialVersionUID = 1L;
private final boolean ascendingComparison;
private final DoubleValue reference = new DoubleValue();
private final DoubleValue tempReference = new DoubleValue();
private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
public DoubleValueComparator(boolean ascending) {
this.ascendingComparison = ascending;
}
@Override
public int hash(DoubleValue record) {
return record.hashCode();
}
@Override
public void setReference(DoubleValue toCompare) {
toCompare.copyTo(reference);
}
@Override
public boolean equalToReference(DoubleValue candidate) {
return candidate.equals(this.reference);
}
@Override
public int compareToReference(TypeComparator<DoubleValue> referencedComparator) {
DoubleValue otherRef = ((DoubleValueComparator) referencedComparator).reference;
int comp = otherRef.compareTo(reference);
return ascendingComparison ? comp : -comp;
}
@Override
public int compare(DoubleValue first, DoubleValue second) {
int comp = first.compareTo(second);
return ascendingComparison ? comp : -comp;
}
@Override
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
reference.read(firstSource);
tempReference.read(secondSource);
int comp = reference.compareTo(tempReference);
return ascendingComparison ? comp : -comp;
}
@Override
public boolean supportsNormalizedKey() {
return NormalizableKey.class.isAssignableFrom(DoubleValue.class);
}
@Override
public int getNormalizeKeyLen() {
NormalizableKey<?> key = (NormalizableKey<?>) reference;
return key.getMaxNormalizedKeyLen();
}
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
return keyBytes < getNormalizeKeyLen();
}
@Override
public void putNormalizedKey(DoubleValue record, MemorySegment target, int offset, int numBytes) {
NormalizableKey<?> key = (NormalizableKey<?>) record;
key.copyNormalizedKey(target, offset, numBytes);
}
@Override
public boolean invertNormalizedKey() {
return !ascendingComparison;
}
@Override
public TypeComparator<DoubleValue> duplicate() {
return new DoubleValueComparator(ascendingComparison);
}
@Override
public int extractKeys(Object record, Object[] target, int index) {
target[index] = record;
return 1;
}
@Override
public TypeComparator<?>[] getFlatComparators() {
return comparators;
}
// --------------------------------------------------------------------------------------------
// unsupported normalization
// --------------------------------------------------------------------------------------------
@Override
public boolean supportsSerializationWithKeyNormalization() {
return false;
}
@Override
public void writeWithKeyNormalization(DoubleValue record, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public DoubleValue readWithKeyDenormalization(DoubleValue reuse, DataInputView source) throws IOException {
throw new UnsupportedOperationException();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.types.FloatValue;
import org.apache.flink.types.NormalizableKey;
import java.io.IOException;
/**
* Specialized comparator for FloatValue based on CopyableValueComparator.
*/
@Internal
public class FloatValueComparator extends TypeComparator<FloatValue> {
private static final long serialVersionUID = 1L;
private final boolean ascendingComparison;
private final FloatValue reference = new FloatValue();
private final FloatValue tempReference = new FloatValue();
private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
public FloatValueComparator(boolean ascending) {
this.ascendingComparison = ascending;
}
@Override
public int hash(FloatValue record) {
return record.hashCode();
}
@Override
public void setReference(FloatValue toCompare) {
toCompare.copyTo(reference);
}
@Override
public boolean equalToReference(FloatValue candidate) {
return candidate.equals(this.reference);
}
@Override
public int compareToReference(TypeComparator<FloatValue> referencedComparator) {
FloatValue otherRef = ((FloatValueComparator) referencedComparator).reference;
int comp = otherRef.compareTo(reference);
return ascendingComparison ? comp : -comp;
}
@Override
public int compare(FloatValue first, FloatValue second) {
int comp = first.compareTo(second);
return ascendingComparison ? comp : -comp;
}
@Override
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
reference.read(firstSource);
tempReference.read(secondSource);
int comp = reference.compareTo(tempReference);
return ascendingComparison ? comp : -comp;
}
@Override
public boolean supportsNormalizedKey() {
return NormalizableKey.class.isAssignableFrom(FloatValue.class);
}
@Override
public int getNormalizeKeyLen() {
NormalizableKey<?> key = (NormalizableKey<?>) reference;
return key.getMaxNormalizedKeyLen();
}
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
return keyBytes < getNormalizeKeyLen();
}
@Override
public void putNormalizedKey(FloatValue record, MemorySegment target, int offset, int numBytes) {
NormalizableKey<?> key = (NormalizableKey<?>) record;
key.copyNormalizedKey(target, offset, numBytes);
}
@Override
public boolean invertNormalizedKey() {
return !ascendingComparison;
}
@Override
public TypeComparator<FloatValue> duplicate() {
return new FloatValueComparator(ascendingComparison);
}
@Override
public int extractKeys(Object record, Object[] target, int index) {
target[index] = record;
return 1;
}
@Override
public TypeComparator<?>[] getFlatComparators() {
return comparators;
}
// --------------------------------------------------------------------------------------------
// unsupported normalization
// --------------------------------------------------------------------------------------------
@Override
public boolean supportsSerializationWithKeyNormalization() {
return false;
}
@Override
public void writeWithKeyNormalization(FloatValue record, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public FloatValue readWithKeyDenormalization(FloatValue reuse, DataInputView source) throws IOException {
throw new UnsupportedOperationException();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.NormalizableKey;
import java.io.IOException;
/**
* Specialized comparator for IntValue based on CopyableValueComparator.
*/
@Internal
public class IntValueComparator extends TypeComparator<IntValue> {
private static final long serialVersionUID = 1L;
private final boolean ascendingComparison;
private final IntValue reference = new IntValue();
private final IntValue tempReference = new IntValue();
private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
public IntValueComparator(boolean ascending) {
this.ascendingComparison = ascending;
}
@Override
public int hash(IntValue record) {
return record.hashCode();
}
@Override
public void setReference(IntValue toCompare) {
toCompare.copyTo(reference);
}
@Override
public boolean equalToReference(IntValue candidate) {
return candidate.equals(this.reference);
}
@Override
public int compareToReference(TypeComparator<IntValue> referencedComparator) {
IntValue otherRef = ((IntValueComparator) referencedComparator).reference;
int comp = otherRef.compareTo(reference);
return ascendingComparison ? comp : -comp;
}
@Override
public int compare(IntValue first, IntValue second) {
int comp = first.compareTo(second);
return ascendingComparison ? comp : -comp;
}
@Override
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
reference.read(firstSource);
tempReference.read(secondSource);
int comp = reference.compareTo(tempReference);
return ascendingComparison ? comp : -comp;
}
@Override
public boolean supportsNormalizedKey() {
return NormalizableKey.class.isAssignableFrom(IntValue.class);
}
@Override
public int getNormalizeKeyLen() {
return reference.getMaxNormalizedKeyLen();
}
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
return keyBytes < getNormalizeKeyLen();
}
@Override
public void putNormalizedKey(IntValue record, MemorySegment target, int offset, int numBytes) {
record.copyNormalizedKey(target, offset, numBytes);
}
@Override
public boolean invertNormalizedKey() {
return !ascendingComparison;
}
@Override
public TypeComparator<IntValue> duplicate() {
return new IntValueComparator(ascendingComparison);
}
@Override
public int extractKeys(Object record, Object[] target, int index) {
target[index] = record;
return 1;
}
@Override
public TypeComparator<?>[] getFlatComparators() {
return comparators;
}
// --------------------------------------------------------------------------------------------
// unsupported normalization
// --------------------------------------------------------------------------------------------
@Override
public boolean supportsSerializationWithKeyNormalization() {
return false;
}
@Override
public void writeWithKeyNormalization(IntValue record, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public IntValue readWithKeyDenormalization(IntValue reuse, DataInputView source) throws IOException {
throw new UnsupportedOperationException();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NormalizableKey;
import java.io.IOException;
/**
* Specialized comparator for LongValue based on CopyableValueComparator.
*/
@Internal
public class LongValueComparator extends TypeComparator<LongValue> {
private static final long serialVersionUID = 1L;
private final boolean ascendingComparison;
private final LongValue reference = new LongValue();
private final LongValue tempReference = new LongValue();
private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
public LongValueComparator(boolean ascending) {
this.ascendingComparison = ascending;
}
@Override
public int hash(LongValue record) {
return record.hashCode();
}
@Override
public void setReference(LongValue toCompare) {
toCompare.copyTo(reference);
}
@Override
public boolean equalToReference(LongValue candidate) {
return candidate.equals(this.reference);
}
@Override
public int compareToReference(TypeComparator<LongValue> referencedComparator) {
LongValue otherRef = ((LongValueComparator) referencedComparator).reference;
int comp = otherRef.compareTo(reference);
return ascendingComparison ? comp : -comp;
}
@Override
public int compare(LongValue first, LongValue second) {
int comp = first.compareTo(second);
return ascendingComparison ? comp : -comp;
}
@Override
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
reference.read(firstSource);
tempReference.read(secondSource);
int comp = reference.compareTo(tempReference);
return ascendingComparison ? comp : -comp;
}
@Override
public boolean supportsNormalizedKey() {
return NormalizableKey.class.isAssignableFrom(LongValue.class);
}
@Override
public int getNormalizeKeyLen() {
return reference.getMaxNormalizedKeyLen();
}
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
return keyBytes < getNormalizeKeyLen();
}
@Override
public void putNormalizedKey(LongValue record, MemorySegment target, int offset, int numBytes) {
record.copyNormalizedKey(target, offset, numBytes);
}
@Override
public boolean invertNormalizedKey() {
return !ascendingComparison;
}
@Override
public TypeComparator<LongValue> duplicate() {
return new LongValueComparator(ascendingComparison);
}
@Override
public int extractKeys(Object record, Object[] target, int index) {
target[index] = record;
return 1;
}
@Override
public TypeComparator<?>[] getFlatComparators() {
return comparators;
}
// --------------------------------------------------------------------------------------------
// unsupported normalization
// --------------------------------------------------------------------------------------------
@Override
public boolean supportsSerializationWithKeyNormalization() {
return false;
}
@Override
public void writeWithKeyNormalization(LongValue record, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public LongValue readWithKeyDenormalization(LongValue reuse, DataInputView source) throws IOException {
throw new UnsupportedOperationException();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.types.NormalizableKey;
import org.apache.flink.types.NullValue;
import java.io.IOException;
/**
* Specialized comparator for NullValue based on CopyableValueComparator.
*/
@Internal
public class NullValueComparator extends TypeComparator<NullValue> {
private static final long serialVersionUID = 1L;
private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
private final static NullValueComparator INSTANCE = new NullValueComparator();
public static NullValueComparator getInstance() {
return INSTANCE;
}
private NullValueComparator() {}
@Override
public int hash(NullValue record) {
return record.hashCode();
}
@Override
public void setReference(NullValue toCompare) {}
@Override
public boolean equalToReference(NullValue candidate) {
return true;
}
@Override
public int compareToReference(TypeComparator<NullValue> referencedComparator) {
return 0;
}
@Override
public int compare(NullValue first, NullValue second) {
return 0;
}
@Override
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
return 0;
}
@Override
public boolean supportsNormalizedKey() {
return NormalizableKey.class.isAssignableFrom(NullValue.class);
}
@Override
public int getNormalizeKeyLen() {
return NullValue.getInstance().getMaxNormalizedKeyLen();
}
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
return keyBytes < getNormalizeKeyLen();
}
@Override
public void putNormalizedKey(NullValue record, MemorySegment target, int offset, int numBytes) {
record.copyNormalizedKey(target, offset, numBytes);
}
@Override
public boolean invertNormalizedKey() {
return false;
}
@Override
public TypeComparator<NullValue> duplicate() {
return NullValueComparator.getInstance();
}
@Override
public int extractKeys(Object record, Object[] target, int index) {
target[index] = record;
return 1;
}
@Override
public TypeComparator<?>[] getFlatComparators() {
return comparators;
}
// --------------------------------------------------------------------------------------------
// unsupported normalization
// --------------------------------------------------------------------------------------------
@Override
public boolean supportsSerializationWithKeyNormalization() {
return false;
}
@Override
public void writeWithKeyNormalization(NullValue record, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public NullValue readWithKeyDenormalization(NullValue reuse, DataInputView source) throws IOException {
throw new UnsupportedOperationException();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.NullValue;
import java.io.IOException;
@Internal
public final class NullValueSerializer extends TypeSerializerSingleton<NullValue> {
private static final long serialVersionUID = 1L;
public static final NullValueSerializer INSTANCE = new NullValueSerializer();
@Override
public boolean isImmutableType() {
return false;
}
@Override
public NullValue createInstance() {
return NullValue.getInstance();
}
@Override
public NullValue copy(NullValue from) {
return NullValue.getInstance();
}
@Override
public NullValue copy(NullValue from, NullValue reuse) {
return NullValue.getInstance();
}
@Override
public int getLength() {
return 0;
}
@Override
public void serialize(NullValue record, DataOutputView target) throws IOException {
}
@Override
public NullValue deserialize(DataInputView source) throws IOException {
return NullValue.getInstance();
}
@Override
public NullValue deserialize(NullValue reuse, DataInputView source) throws IOException {
return NullValue.getInstance();
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof NullValueSerializer;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.types.NormalizableKey;
import org.apache.flink.types.ShortValue;
import java.io.IOException;
/**
* Specialized comparator for ShortValue based on CopyableValueComparator.
*/
@Internal
public class ShortValueComparator extends TypeComparator<ShortValue> {
private static final long serialVersionUID = 1L;
private final boolean ascendingComparison;
private final ShortValue reference = new ShortValue();
private final ShortValue tempReference = new ShortValue();
private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
public ShortValueComparator(boolean ascending) {
this.ascendingComparison = ascending;
}
@Override
public int hash(ShortValue record) {
return record.hashCode();
}
@Override
public void setReference(ShortValue toCompare) {
toCompare.copyTo(reference);
}
@Override
public boolean equalToReference(ShortValue candidate) {
return candidate.equals(this.reference);
}
@Override
public int compareToReference(TypeComparator<ShortValue> referencedComparator) {
ShortValue otherRef = ((ShortValueComparator) referencedComparator).reference;
int comp = otherRef.compareTo(reference);
return ascendingComparison ? comp : -comp;
}
@Override
public int compare(ShortValue first, ShortValue second) {
int comp = first.compareTo(second);
return ascendingComparison ? comp : -comp;
}
@Override
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
reference.read(firstSource);
tempReference.read(secondSource);
int comp = reference.compareTo(tempReference);
return ascendingComparison ? comp : -comp;
}
@Override
public boolean supportsNormalizedKey() {
return NormalizableKey.class.isAssignableFrom(ShortValue.class);
}
@Override
public int getNormalizeKeyLen() {
return reference.getMaxNormalizedKeyLen();
}
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
return keyBytes < getNormalizeKeyLen();
}
@Override
public void putNormalizedKey(ShortValue record, MemorySegment target, int offset, int numBytes) {
record.copyNormalizedKey(target, offset, numBytes);
}
@Override
public boolean invertNormalizedKey() {
return !ascendingComparison;
}
@Override
public TypeComparator<ShortValue> duplicate() {
return new ShortValueComparator(ascendingComparison);
}
@Override
public int extractKeys(Object record, Object[] target, int index) {
target[index] = record;
return 1;
}
@Override
public TypeComparator<?>[] getFlatComparators() {
return comparators;
}
// --------------------------------------------------------------------------------------------
// unsupported normalization
// --------------------------------------------------------------------------------------------
@Override
public boolean supportsSerializationWithKeyNormalization() {
return false;
}
@Override
public void writeWithKeyNormalization(ShortValue record, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public ShortValue readWithKeyDenormalization(ShortValue reuse, DataInputView source) throws IOException {
throw new UnsupportedOperationException();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.types.NormalizableKey;
import org.apache.flink.types.StringValue;
import java.io.IOException;
/**
* Specialized comparator for StringValue based on CopyableValueComparator.
*/
@Internal
public class StringValueComparator extends TypeComparator<StringValue> {
private static final long serialVersionUID = 1L;
private final boolean ascendingComparison;
private final StringValue reference = new StringValue();
private final StringValue tempReference = new StringValue();
private final TypeComparator<?>[] comparators = new TypeComparator[] {this};
public StringValueComparator(boolean ascending) {
this.ascendingComparison = ascending;
}
@Override
public int hash(StringValue record) {
return record.hashCode();
}
@Override
public void setReference(StringValue toCompare) {
toCompare.copyTo(reference);
}
@Override
public boolean equalToReference(StringValue candidate) {
return candidate.equals(this.reference);
}
@Override
public int compareToReference(TypeComparator<StringValue> referencedComparator) {
StringValue otherRef = ((StringValueComparator) referencedComparator).reference;
int comp = otherRef.compareTo(reference);
return ascendingComparison ? comp : -comp;
}
@Override
public int compare(StringValue first, StringValue second) {
int comp = first.compareTo(second);
return ascendingComparison ? comp : -comp;
}
@Override
public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException {
reference.read(firstSource);
tempReference.read(secondSource);
int comp = reference.compareTo(tempReference);
return ascendingComparison ? comp : -comp;
}
@Override
public boolean supportsNormalizedKey() {
return NormalizableKey.class.isAssignableFrom(StringValue.class);
}
@Override
public int getNormalizeKeyLen() {
return reference.getMaxNormalizedKeyLen();
}
@Override
public boolean isNormalizedKeyPrefixOnly(int keyBytes) {
return keyBytes < getNormalizeKeyLen();
}
@Override
public void putNormalizedKey(StringValue record, MemorySegment target, int offset, int numBytes) {
record.copyNormalizedKey(target, offset, numBytes);
}
@Override
public boolean invertNormalizedKey() {
return !ascendingComparison;
}
@Override
public TypeComparator<StringValue> duplicate() {
return new StringValueComparator(ascendingComparison);
}
@Override
public int extractKeys(Object record, Object[] target, int index) {
target[index] = record;
return 1;
}
@Override
public TypeComparator<?>[] getFlatComparators() {
return comparators;
}
// --------------------------------------------------------------------------------------------
// unsupported normalization
// --------------------------------------------------------------------------------------------
@Override
public boolean supportsSerializationWithKeyNormalization() {
return false;
}
@Override
public void writeWithKeyNormalization(StringValue record, DataOutputView target) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public StringValue readWithKeyDenormalization(StringValue reuse, DataInputView source) throws IOException {
throw new UnsupportedOperationException();
}
}
......@@ -26,6 +26,26 @@ import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.BooleanValueComparator;
import org.apache.flink.api.common.typeutils.base.BooleanValueSerializer;
import org.apache.flink.api.common.typeutils.base.ByteValueComparator;
import org.apache.flink.api.common.typeutils.base.ByteValueSerializer;
import org.apache.flink.api.common.typeutils.base.CharValueComparator;
import org.apache.flink.api.common.typeutils.base.CharValueSerializer;
import org.apache.flink.api.common.typeutils.base.DoubleValueComparator;
import org.apache.flink.api.common.typeutils.base.DoubleValueSerializer;
import org.apache.flink.api.common.typeutils.base.FloatValueComparator;
import org.apache.flink.api.common.typeutils.base.FloatValueSerializer;
import org.apache.flink.api.common.typeutils.base.IntValueComparator;
import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
import org.apache.flink.api.common.typeutils.base.LongValueComparator;
import org.apache.flink.api.common.typeutils.base.LongValueSerializer;
import org.apache.flink.api.common.typeutils.base.NullValueComparator;
import org.apache.flink.api.common.typeutils.base.NullValueSerializer;
import org.apache.flink.api.common.typeutils.base.ShortValueComparator;
import org.apache.flink.api.common.typeutils.base.ShortValueSerializer;
import org.apache.flink.api.common.typeutils.base.StringValueComparator;
import org.apache.flink.api.common.typeutils.base.StringValueSerializer;
import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator;
import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer;
import org.apache.flink.api.java.typeutils.runtime.ValueComparator;
......@@ -126,7 +146,37 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
@SuppressWarnings("unchecked")
@PublicEvolving
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
if (CopyableValue.class.isAssignableFrom(type)) {
if (BooleanValue.class.isAssignableFrom(type)) {
return (TypeSerializer<T>) BooleanValueSerializer.INSTANCE;
}
else if (ByteValue.class.isAssignableFrom(type)) {
return (TypeSerializer<T>) ByteValueSerializer.INSTANCE;
}
else if (CharValue.class.isAssignableFrom(type)) {
return (TypeSerializer<T>) CharValueSerializer.INSTANCE;
}
else if (DoubleValue.class.isAssignableFrom(type)) {
return (TypeSerializer<T>) DoubleValueSerializer.INSTANCE;
}
else if (FloatValue.class.isAssignableFrom(type)) {
return (TypeSerializer<T>) FloatValueSerializer.INSTANCE;
}
else if (IntValue.class.isAssignableFrom(type)) {
return (TypeSerializer<T>) IntValueSerializer.INSTANCE;
}
else if (LongValue.class.isAssignableFrom(type)) {
return (TypeSerializer<T>) LongValueSerializer.INSTANCE;
}
else if (NullValue.class.isAssignableFrom(type)) {
return (TypeSerializer<T>) NullValueSerializer.INSTANCE;
}
else if (ShortValue.class.isAssignableFrom(type)) {
return (TypeSerializer<T>) ShortValueSerializer.INSTANCE;
}
else if (StringValue.class.isAssignableFrom(type)) {
return (TypeSerializer<T>) StringValueSerializer.INSTANCE;
}
else if (CopyableValue.class.isAssignableFrom(type)) {
return (TypeSerializer<T>) createCopyableValueSerializer(type.asSubclass(CopyableValue.class));
}
else {
......@@ -142,7 +192,37 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
throw new RuntimeException("The type " + type.getName() + " is not Comparable.");
}
if (CopyableValue.class.isAssignableFrom(type)) {
if (BooleanValue.class.isAssignableFrom(type)) {
return (TypeComparator<T>) new BooleanValueComparator(sortOrderAscending);
}
else if (ByteValue.class.isAssignableFrom(type)) {
return (TypeComparator<T>) new ByteValueComparator(sortOrderAscending);
}
else if (CharValue.class.isAssignableFrom(type)) {
return (TypeComparator<T>) new CharValueComparator(sortOrderAscending);
}
else if (DoubleValue.class.isAssignableFrom(type)) {
return (TypeComparator<T>) new DoubleValueComparator(sortOrderAscending);
}
else if (FloatValue.class.isAssignableFrom(type)) {
return (TypeComparator<T>) new FloatValueComparator(sortOrderAscending);
}
else if (IntValue.class.isAssignableFrom(type)) {
return (TypeComparator<T>) new IntValueComparator(sortOrderAscending);
}
else if (LongValue.class.isAssignableFrom(type)) {
return (TypeComparator<T>) new LongValueComparator(sortOrderAscending);
}
else if (NullValue.class.isAssignableFrom(type)) {
return (TypeComparator<T>) NullValueComparator.getInstance();
}
else if (ShortValue.class.isAssignableFrom(type)) {
return (TypeComparator<T>) new ShortValueComparator(sortOrderAscending);
}
else if (StringValue.class.isAssignableFrom(type)) {
return (TypeComparator<T>) new StringValueComparator(sortOrderAscending);
}
else if (CopyableValue.class.isAssignableFrom(type)) {
return (TypeComparator<T>) new CopyableValueComparator(sortOrderAscending, type);
}
else {
......
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.api.common.typeutils.ComparatorTestBase;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.BooleanValue;
public class BooleanValueComparatorTest extends ComparatorTestBase<BooleanValue> {
@Override
protected TypeComparator<BooleanValue> createComparator(boolean ascending) {
return new BooleanValueComparator(ascending);
}
@Override
protected TypeSerializer<BooleanValue> createSerializer() {
return new BooleanValueSerializer();
}
@Override
protected BooleanValue[] getSortedTestData() {
return new BooleanValue[]{BooleanValue.FALSE, BooleanValue.TRUE};
}
}
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.api.common.typeutils.ComparatorTestBase;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.ByteValue;
import java.util.Random;
public class ByteValueComparatorTest extends ComparatorTestBase<ByteValue> {
@Override
protected TypeComparator<ByteValue> createComparator(boolean ascending) {
return new ByteValueComparator(ascending);
}
@Override
protected TypeSerializer<ByteValue> createSerializer() {
return new ByteValueSerializer();
}
@Override
protected ByteValue[] getSortedTestData() {
Random rnd = new Random(874597969123412338L);
int rndByte = rnd.nextInt(Byte.MAX_VALUE);
if (rndByte < 0) {
rndByte = -rndByte;
}
if (rndByte == Byte.MAX_VALUE) {
rndByte -= 3;
}
if (rndByte <= 2) {
rndByte += 3;
}
return new ByteValue[]{
new ByteValue(Byte.MIN_VALUE),
new ByteValue(Integer.valueOf(-rndByte).byteValue()),
new ByteValue(Integer.valueOf(-1).byteValue()),
new ByteValue(Integer.valueOf(0).byteValue()),
new ByteValue(Integer.valueOf(1).byteValue()),
new ByteValue(Integer.valueOf(2).byteValue()),
new ByteValue(Integer.valueOf(rndByte).byteValue()),
new ByteValue(Byte.MAX_VALUE)};
}
}
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.api.common.typeutils.ComparatorTestBase;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.CharValue;
import java.util.Random;
public class CharValueComparatorTest extends ComparatorTestBase<CharValue> {
@Override
protected TypeComparator<CharValue> createComparator(boolean ascending) {
return new CharValueComparator(ascending);
}
@Override
protected TypeSerializer<CharValue> createSerializer() {
return new CharValueSerializer();
}
@Override
protected CharValue[] getSortedTestData() {
Random rnd = new Random(874597969123412338L);
int rndChar = rnd.nextInt(Character.MAX_VALUE);
if(rndChar<0){
rndChar=-rndChar;
}
if(rndChar==(int)Character.MIN_VALUE){
rndChar+=2;
}
if(rndChar==(int)Character.MAX_VALUE){
rndChar-=2;
}
return new CharValue[]{
new CharValue(Character.MIN_VALUE),
new CharValue((char)rndChar),
new CharValue((char)(rndChar+1)),
new CharValue(Character.MAX_VALUE)
};
}
}
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.api.common.typeutils.ComparatorTestBase;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.DoubleValue;
import java.util.Random;
public class DoubleValueComparatorTest extends ComparatorTestBase<DoubleValue> {
@Override
protected TypeComparator<DoubleValue> createComparator(boolean ascending) {
return new DoubleValueComparator(ascending);
}
@Override
protected TypeSerializer<DoubleValue> createSerializer() {
return new DoubleValueSerializer();
}
@Override
protected DoubleValue[] getSortedTestData() {
Random rnd = new Random(874597969123412338L);
double rndDouble = rnd.nextDouble();
if (rndDouble < 0) {
rndDouble = -rndDouble;
}
if (rndDouble == Double.MAX_VALUE) {
rndDouble -= 3;
}
if (rndDouble <= 2) {
rndDouble += 3;
}
return new DoubleValue[]{
new DoubleValue(-rndDouble),
new DoubleValue(-1.0D),
new DoubleValue(0.0D),
new DoubleValue(2.0D),
new DoubleValue(rndDouble),
new DoubleValue(Double.MAX_VALUE)};
}
}
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.api.common.typeutils.ComparatorTestBase;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.FloatValue;
import java.util.Random;
public class FloatValueComparatorTest extends ComparatorTestBase<FloatValue> {
@Override
protected TypeComparator<FloatValue> createComparator(boolean ascending) {
return new FloatValueComparator(ascending);
}
@Override
protected TypeSerializer<FloatValue> createSerializer() {
return new FloatValueSerializer();
}
@Override
protected FloatValue[] getSortedTestData() {
Random rnd = new Random(874597969123412338L);
float rndFloat = rnd.nextFloat();
if (rndFloat < 0) {
rndFloat = -rndFloat;
}
if (rndFloat == Float.MAX_VALUE) {
rndFloat -= 3;
}
if (rndFloat <= 2) {
rndFloat += 3;
}
return new FloatValue[]{
new FloatValue(-rndFloat),
new FloatValue(-1.0F),
new FloatValue(0.0F),
new FloatValue(2.0F),
new FloatValue(rndFloat),
new FloatValue(Float.MAX_VALUE)};
}
}
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.api.common.typeutils.ComparatorTestBase;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.IntValue;
import java.util.Random;
public class IntValueComparatorTest extends ComparatorTestBase<IntValue> {
@Override
protected TypeComparator<IntValue> createComparator(boolean ascending) {
return new IntValueComparator(ascending);
}
@Override
protected TypeSerializer<IntValue> createSerializer() {
return new IntValueSerializer();
}
@Override
protected IntValue[] getSortedTestData() {
Random rnd = new Random(874597969123412338L);
int rndInt = rnd.nextInt();
if (rndInt < 0) {
rndInt = -rndInt;
}
if (rndInt == Integer.MAX_VALUE) {
rndInt -= 3;
}
if (rndInt <= 2) {
rndInt += 3;
}
return new IntValue[]{
new IntValue(Integer.MIN_VALUE),
new IntValue(-rndInt),
new IntValue(-1),
new IntValue(0),
new IntValue(1),
new IntValue(2),
new IntValue(rndInt),
new IntValue(Integer.MAX_VALUE)};
}
}
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.api.common.typeutils.ComparatorTestBase;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.LongValue;
import java.util.Random;
public class LongValueComparatorTest extends ComparatorTestBase<LongValue> {
@Override
protected TypeComparator<LongValue> createComparator(boolean ascending) {
return new LongValueComparator(ascending);
}
@Override
protected TypeSerializer<LongValue> createSerializer() {
return new LongValueSerializer();
}
@Override
protected LongValue[] getSortedTestData() {
Random rnd = new Random(874597969123412338L);
long rndLong = rnd.nextLong();
if (rndLong < 0) {
rndLong = -rndLong;
}
if (rndLong == Long.MAX_VALUE) {
rndLong -= 3;
}
if (rndLong <= 2) {
rndLong += 3;
}
return new LongValue[]{
new LongValue(Long.MIN_VALUE),
new LongValue(-rndLong),
new LongValue(-1L),
new LongValue(0L),
new LongValue(1L),
new LongValue(2L),
new LongValue(rndLong),
new LongValue(Long.MAX_VALUE)};
}
}
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.api.common.typeutils.ComparatorTestBase;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.ShortValue;
import java.util.Random;
public class ShortValueComparatorTest extends ComparatorTestBase<ShortValue> {
@Override
protected TypeComparator<ShortValue> createComparator(boolean ascending) {
return new ShortValueComparator(ascending);
}
@Override
protected TypeSerializer<ShortValue> createSerializer() {
return new ShortValueSerializer();
}
@Override
protected ShortValue[] getSortedTestData() {
Random rnd = new Random(874597969123412338L);
short rndShort = Integer.valueOf(rnd.nextInt()).shortValue();
if (rndShort < 0) {
rndShort = Integer.valueOf(-rndShort).shortValue();
}
if (rndShort == Short.MAX_VALUE) {
rndShort -= 3;
}
if (rndShort <= 2) {
rndShort += 3;
}
return new ShortValue[]{
new ShortValue(Short.MIN_VALUE),
new ShortValue(Integer.valueOf(-rndShort).shortValue()),
new ShortValue(Integer.valueOf(-1).shortValue()),
new ShortValue(Integer.valueOf(0).shortValue()),
new ShortValue(Integer.valueOf(1).shortValue()),
new ShortValue(Integer.valueOf(2).shortValue()),
new ShortValue(Integer.valueOf(rndShort).shortValue()),
new ShortValue(Short.MAX_VALUE)};
}
}
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package org.apache.flink.api.common.typeutils.base;
import org.apache.flink.api.common.typeutils.ComparatorTestBase;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.StringValue;
public class StringValueComparatorTest extends ComparatorTestBase<StringValue> {
@Override
protected TypeComparator<StringValue> createComparator(boolean ascending) {
return new StringValueComparator(ascending);
}
@Override
protected TypeSerializer<StringValue> createSerializer() {
return new StringValueSerializer();
}
@Override
protected StringValue[] getSortedTestData() {
return new StringValue[]{
new StringValue(""),
new StringValue("Lorem Ipsum Dolor Omit Longer"),
new StringValue("aaaa"),
new StringValue("abcd"),
new StringValue("abce"),
new StringValue("abdd"),
new StringValue("accd"),
new StringValue("bbcd")
};
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册