提交 6b253d9f 编写于 作者: V vasia

[FLINK-3002] Add Either type, EitherTypeInfo, and EitherSerializer to the Java API

This closes #1371
上级 6888c9cf
...@@ -1493,6 +1493,7 @@ There are six different categories of data types: ...@@ -1493,6 +1493,7 @@ There are six different categories of data types:
4. **Regular Classes** 4. **Regular Classes**
5. **Values** 5. **Values**
6. **Hadoop Writables** 6. **Hadoop Writables**
7. **Special Types**
#### Tuples and Case Classes #### Tuples and Case Classes
...@@ -1651,6 +1652,12 @@ be altered, allowing programmers to reuse objects and take pressure off the garb ...@@ -1651,6 +1652,12 @@ be altered, allowing programmers to reuse objects and take pressure off the garb
You can use types that implement the `org.apache.hadoop.Writable` interface. The serialization logic You can use types that implement the `org.apache.hadoop.Writable` interface. The serialization logic
defined in the `write()`and `readFields()` methods will be used for serialization. defined in the `write()`and `readFields()` methods will be used for serialization.
#### Special Types
You can use special types, including Scala's `Either`, `Option`, and `Try`.
The Java API has its own custom implementation of `Either`.
Similarly to Scala's `Either`, it represents a value of one two possible types, *Left* or *Right*.
`Either` can be useful for error handling or operators that need to output two different types of records.
#### Type Erasure & Type Inference #### Type Erasure & Type Inference
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils;
/**
* This type represents a value of one two possible types, Left or Right
* (a disjoint union), inspired by Scala's Either type.
*
* @param <L> the type of Left
* @param <R> the type of Right
*/
public abstract class Either<L, R> {
/**
* Create a Left value of Either
*/
public static <L, R> Either<L, R> left(L value) {
return new Left<L, R>(value);
}
/**
* Create a Right value of Either
*/
public static <L, R> Either<L, R> right(R value) {
return new Right<L, R>(value);
}
/**
* Retrieve the Left value of Either.
* @return the Left value
* @throws IllegalStateException if called on a Right
*/
public abstract L left() throws IllegalStateException;
/**
* Retrieve the Right value of Either.
* @return the Right value
* @throws IllegalStateException if called on a Left
*/
public abstract R right() throws IllegalStateException;
/**
*
* @return true if this is a Left value, false if this is a Right value
*/
public final boolean isLeft() {
return getClass() == Left.class;
}
/**
*
* @return true if this is a Right value, false if this is a Left value
*/
public final boolean isRight() {
return getClass() == Right.class;
}
private static class Left<L, R> extends Either<L, R> {
private final L value;
public Left(L value) {
this.value = java.util.Objects.requireNonNull(value);
}
@Override
public L left() {
return value;
}
@Override
public R right() {
throw new IllegalStateException("Cannot retrieve Right value on a Left");
}
@Override
public boolean equals(Object object) {
if (object instanceof Left<?, ?>) {
final Left<?, ?> other = (Left<?, ?>) object;
return value.equals(other.value);
}
return false;
}
@Override
public int hashCode() {
return value.hashCode();
}
@Override
public String toString() {
return "Left(" + value.toString() + ")";
}
}
private static class Right<L, R> extends Either<L, R> {
private final R value;
public Right(R value) {
this.value = java.util.Objects.requireNonNull(value);
}
@Override
public L left() {
throw new IllegalStateException("Cannot retrieve Left value on a Right");
}
@Override
public R right() {
return value;
}
@Override
public boolean equals(Object object) {
if (object instanceof Right<?, ?>) {
final Right<?, ?> other = (Right<?, ?>) object;
return value.equals(other.value);
}
return false;
}
@Override
public int hashCode() {
return value.hashCode();
}
@Override
public String toString() {
return "Right(" + value.toString() + ")";
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
/**
* A {@link TypeInformation} for the {@link Either} type of the Java API.
*
* @param <L> the Left value type
* @param <R> the Right value type
*/
public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {
private static final long serialVersionUID = 1L;
private final TypeInformation<L> leftType;
private final TypeInformation<R> rightType;
public EitherTypeInfo(TypeInformation<L> leftType,TypeInformation<R> rightType) {
this.leftType = leftType;
this.rightType = rightType;
}
@Override
public boolean isBasicType() {
return false;
}
@Override
public boolean isTupleType() {
return false;
}
@Override
public int getArity() {
return 1;
}
@Override
public int getTotalFields() {
return 1;
}
@SuppressWarnings("unchecked")
@Override
public Class<Either<L, R>> getTypeClass() {
return (Class<Either<L, R>>) (Class<?>) Either.class;
}
@Override
public boolean isKeyType() {
return false;
}
@Override
public TypeSerializer<Either<L, R>> createSerializer(ExecutionConfig config) {
return new EitherSerializer<L, R>(leftType.createSerializer(config),
rightType.createSerializer(config));
}
@Override
public String toString() {
return "Either <" + leftType.toString() + ", " + rightType.toString() + ">";
}
@SuppressWarnings("unchecked")
@Override
public boolean equals(Object obj) {
if (obj instanceof EitherTypeInfo) {
EitherTypeInfo<L, R> other = (EitherTypeInfo<L, R>) obj;
return other.canEqual(this) &&
leftType.equals(other.leftType) &&
rightType.equals(other.rightType);
} else {
return false;
}
}
@Override
public int hashCode() {
return 17 * leftType.hashCode() + rightType.hashCode();
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof EitherTypeInfo;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.Either;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
* A {@link TypeSerializer} for the {@ link Either} type of the Java class.
*
* @param <L> the Left value type
* @param <R> the Right value type
*/
public class EitherSerializer<L, R> extends TypeSerializer<Either<L, R>> {
private static final long serialVersionUID = 1L;
private final TypeSerializer<L> leftSerializer;
private final TypeSerializer<R> rightSerializer;
public EitherSerializer(TypeSerializer<L> leftSerializer, TypeSerializer<R> rightSerializer) {
this.leftSerializer = leftSerializer;
this.rightSerializer = rightSerializer;
}
@Override
public boolean isImmutableType() {
return leftSerializer.isImmutableType() && rightSerializer.isImmutableType();
}
@Override
public TypeSerializer<Either<L, R>> duplicate() {
TypeSerializer<L> duplicateLeft = leftSerializer.duplicate();
TypeSerializer<R> duplicateRight = rightSerializer.duplicate();
if ((leftSerializer != duplicateLeft) || (rightSerializer != duplicateRight)) {
// stateful
return new EitherSerializer<L, R>(duplicateLeft, duplicateRight);
}
else {
return this;
}
}
@Override
public Either<L, R> createInstance() {
// We arbitrarily always create a Right value instance.
return Either.right(rightSerializer.createInstance());
}
@Override
public Either<L, R> copy(Either<L, R> from) {
if (from.isLeft()) {
L left = from.left();
L copyLeft = leftSerializer.copy(left);
return Either.left(copyLeft);
}
else {
R right = from.right();
R copyRight = rightSerializer.copy(right);
return Either.right(copyRight);
}
}
@Override
public Either<L, R> copy(Either<L, R> from, Either<L, R> reuse) {
if (from.isRight()) {
final R right = from.right();
if (reuse.isRight()) {
R copyRight = rightSerializer.copy(right, reuse.right());
return Either.right(copyRight);
}
else {
// if the reuse record isn't a right value, we cannot reuse
R copyRight = rightSerializer.copy(right);
return Either.right(copyRight);
}
}
else {
L left = from.left();
// reuse record is never a left value because we always create a right instance
L copyLeft = leftSerializer.copy(left);
return Either.left(copyLeft);
}
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(Either<L, R> record, DataOutputView target) throws IOException {
if (record.isLeft()) {
target.writeBoolean(true);
leftSerializer.serialize(record.left(), target);
}
else {
target.writeBoolean(false);
rightSerializer.serialize(record.right(), target);
}
}
@Override
public Either<L, R> deserialize(DataInputView source) throws IOException {
boolean isLeft = source.readBoolean();
if (isLeft) {
return Either.left(leftSerializer.deserialize(source));
}
else {
return Either.right(rightSerializer.deserialize(source));
}
}
@Override
public Either<L, R> deserialize(Either<L, R> reuse, DataInputView source) throws IOException {
boolean isLeft = source.readBoolean();
if (!isLeft) {
if (reuse.isRight()) {
return Either.right(rightSerializer.deserialize(reuse.right(), source));
}
else {
// if the reuse record isn't a right value, we cannot reuse
return Either.right(rightSerializer.deserialize(source));
}
}
else {
// reuse record is never a left value because we always create a right instance
return Either.left(leftSerializer.deserialize(source));
}
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
boolean isLeft = source.readBoolean();
target.writeBoolean(isLeft);
if (isLeft) {
leftSerializer.copy(source, target);
}
else {
rightSerializer.copy(source, target);
}
}
@SuppressWarnings("unchecked")
@Override
public boolean equals(Object obj) {
if (obj instanceof EitherSerializer) {
EitherSerializer<L, R> other = (EitherSerializer<L, R>) obj;
return other.canEqual(this) &&
leftSerializer.equals(other.leftSerializer) &&
rightSerializer.equals(other.rightSerializer);
} else {
return false;
}
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof EitherSerializer;
}
@Override
public int hashCode() {
return 17 * leftSerializer.hashCode() + rightSerializer.hashCode();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import static org.junit.Assert.*;
public class EitherTypeInfoTest extends TestLogger {
Either<Integer, String> intEither = Either.left(1);
Either<Integer, String> stringEither = Either.right("boo");
Either<Integer, Tuple2<Double, Long>> tuple2Either = Either.right(new Tuple2<Double, Long>(42.0, 2l));
@Test
public void testEitherTypeEquality() {
EitherTypeInfo<Integer, String> eitherInfo1 = new EitherTypeInfo<Integer, String>(
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
EitherTypeInfo<Integer, String> eitherInfo2 = new EitherTypeInfo<Integer, String>(
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
assertEquals(eitherInfo1, eitherInfo2);
assertEquals(eitherInfo1.hashCode(), eitherInfo2.hashCode());
}
@Test
public void testEitherTypeInEquality() {
EitherTypeInfo<Integer, String> eitherInfo1 = new EitherTypeInfo<Integer, String>(
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
EitherTypeInfo<Integer, Tuple2<Double, Long>> eitherInfo2 = new EitherTypeInfo<Integer, Tuple2<Double, Long>>(
BasicTypeInfo.INT_TYPE_INFO, new TupleTypeInfo<Tuple2<Double, Long>>(
TypeExtractor.getForClass(Double.class), TypeExtractor.getForClass(String.class)));
assertNotEquals(eitherInfo1, eitherInfo2);
assertNotEquals(eitherInfo1.hashCode(), eitherInfo2.hashCode());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.SerializerTestInstance;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.Either;
import org.apache.flink.api.java.typeutils.EitherTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.junit.Test;
public class EitherSerializerTest {
@SuppressWarnings("unchecked")
@Test
public void testStringDoubleEither() {
Either<String, Double>[] testData = new Either[] {
Either.left("banana"),
Either.left(""),
Either.right(32.0),
Either.right(Double.MIN_VALUE),
Either.right(Double.MAX_VALUE)};
EitherTypeInfo<String, Double> eitherTypeInfo = (EitherTypeInfo<String, Double>) new EitherTypeInfo<String, Double>(
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO);
EitherSerializer<String, Double> eitherSerializer =
(EitherSerializer<String, Double>) eitherTypeInfo.createSerializer(new ExecutionConfig());
SerializerTestInstance<Either<String, Double>> testInstance =
new EitherSerializerTestInstance<Either<String, Double>>(eitherSerializer, eitherTypeInfo.getTypeClass(), -1, testData);
testInstance.testAll();
}
@SuppressWarnings("unchecked")
@Test
public void testEitherWithTuple() {
Either<Tuple2<Long, Long>, Double>[] testData = new Either[] {
Either.left(new Tuple2<>(2l, 9l)),
Either.left(new Tuple2<>(Long.MIN_VALUE, Long.MAX_VALUE)),
Either.right(32.0),
Either.right(Double.MIN_VALUE),
Either.right(Double.MAX_VALUE)};
EitherTypeInfo<Tuple2<Long, Long>, Double> eitherTypeInfo = (EitherTypeInfo<Tuple2<Long, Long>, Double>)
new EitherTypeInfo<Tuple2<Long, Long>, Double>(
new TupleTypeInfo<Tuple2<Long, Long>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO),
BasicTypeInfo.DOUBLE_TYPE_INFO);
EitherSerializer<Tuple2<Long, Long>, Double> eitherSerializer =
(EitherSerializer<Tuple2<Long, Long>, Double>) eitherTypeInfo.createSerializer(new ExecutionConfig());
SerializerTestInstance<Either<Tuple2<Long, Long>, Double>> testInstance =
new EitherSerializerTestInstance<Either<Tuple2<Long, Long>, Double>>(
eitherSerializer, eitherTypeInfo.getTypeClass(), -1, testData);
testInstance.testAll();
}
/**
* {@link org.apache.flink.api.common.typeutils.SerializerTestBase#testInstantiate()}
* checks that the type of the created instance is the same as the type class parameter.
* Since we arbitrarily create always create a Left instance we override this test.
*/
private class EitherSerializerTestInstance<T> extends SerializerTestInstance<T> {
public EitherSerializerTestInstance(TypeSerializer<T> serializer,
Class<T> typeClass, int length, T[] testData) {
super(serializer, typeClass, length, testData);
}
@Override
@Test
public void testInstantiate() {
try {
TypeSerializer<T> serializer = getSerializer();
T instance = serializer.createInstance();
assertNotNull("The created instance must not be null.", instance);
Class<T> type = getTypeClass();
assertNotNull("The test is corrupt: type class is null.", type);
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Exception in test: " + e.getMessage());
}
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册