diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md index 5fbe27ceb2bae166197a26ed7e8cd79f76b56902..8248f5d7c84582dc6a63ed9e9ab05e9206cfd9e0 100644 --- a/docs/apis/programming_guide.md +++ b/docs/apis/programming_guide.md @@ -1493,6 +1493,7 @@ There are six different categories of data types: 4. **Regular Classes** 5. **Values** 6. **Hadoop Writables** +7. **Special Types** #### Tuples and Case Classes @@ -1651,6 +1652,12 @@ be altered, allowing programmers to reuse objects and take pressure off the garb You can use types that implement the `org.apache.hadoop.Writable` interface. The serialization logic defined in the `write()`and `readFields()` methods will be used for serialization. +#### Special Types + +You can use special types, including Scala's `Either`, `Option`, and `Try`. +The Java API has its own custom implementation of `Either`. +Similarly to Scala's `Either`, it represents a value of one two possible types, *Left* or *Right*. +`Either` can be useful for error handling or operators that need to output two different types of records. #### Type Erasure & Type Inference diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java new file mode 100644 index 0000000000000000000000000000000000000000..ba446a168aa65c83aea20a4ad588c4d84078dc78 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/Either.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +/** + * This type represents a value of one two possible types, Left or Right + * (a disjoint union), inspired by Scala's Either type. + * + * @param the type of Left + * @param the type of Right + */ +public abstract class Either { + + /** + * Create a Left value of Either + */ + public static Either left(L value) { + return new Left(value); + } + + /** + * Create a Right value of Either + */ + public static Either right(R value) { + return new Right(value); + } + + /** + * Retrieve the Left value of Either. + * @return the Left value + * @throws IllegalStateException if called on a Right + */ + public abstract L left() throws IllegalStateException; + + /** + * Retrieve the Right value of Either. + * @return the Right value + * @throws IllegalStateException if called on a Left + */ + public abstract R right() throws IllegalStateException; + + /** + * + * @return true if this is a Left value, false if this is a Right value + */ + public final boolean isLeft() { + return getClass() == Left.class; + } + + /** + * + * @return true if this is a Right value, false if this is a Left value + */ + public final boolean isRight() { + return getClass() == Right.class; + } + + private static class Left extends Either { + private final L value; + + public Left(L value) { + this.value = java.util.Objects.requireNonNull(value); + } + + @Override + public L left() { + return value; + } + + @Override + public R right() { + throw new IllegalStateException("Cannot retrieve Right value on a Left"); + } + + @Override + public boolean equals(Object object) { + if (object instanceof Left) { + final Left other = (Left) object; + return value.equals(other.value); + } + return false; + } + + @Override + public int hashCode() { + return value.hashCode(); + } + + @Override + public String toString() { + return "Left(" + value.toString() + ")"; + } + } + + private static class Right extends Either { + private final R value; + + public Right(R value) { + this.value = java.util.Objects.requireNonNull(value); + } + + @Override + public L left() { + throw new IllegalStateException("Cannot retrieve Left value on a Right"); + } + + @Override + public R right() { + return value; + } + + @Override + public boolean equals(Object object) { + if (object instanceof Right) { + final Right other = (Right) object; + return value.equals(other.value); + } + return false; + } + + @Override + public int hashCode() { + return value.hashCode(); + } + + @Override + public String toString() { + return "Right(" + value.toString() + ")"; + } + } +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java new file mode 100644 index 0000000000000000000000000000000000000000..40ed0c0f578b5ae218e6c461f632deb55d79b134 --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.runtime.EitherSerializer; + +/** + * A {@link TypeInformation} for the {@link Either} type of the Java API. + * + * @param the Left value type + * @param the Right value type + */ +public class EitherTypeInfo extends TypeInformation> { + + private static final long serialVersionUID = 1L; + + private final TypeInformation leftType; + + private final TypeInformation rightType; + + public EitherTypeInfo(TypeInformation leftType,TypeInformation rightType) { + this.leftType = leftType; + this.rightType = rightType; + } + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 1; + } + + @Override + public int getTotalFields() { + return 1; + } + + @SuppressWarnings("unchecked") + @Override + public Class> getTypeClass() { + return (Class>) (Class) Either.class; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer> createSerializer(ExecutionConfig config) { + return new EitherSerializer(leftType.createSerializer(config), + rightType.createSerializer(config)); + } + + @Override + public String toString() { + return "Either <" + leftType.toString() + ", " + rightType.toString() + ">"; + } + + @SuppressWarnings("unchecked") + @Override + public boolean equals(Object obj) { + if (obj instanceof EitherTypeInfo) { + EitherTypeInfo other = (EitherTypeInfo) obj; + + return other.canEqual(this) && + leftType.equals(other.leftType) && + rightType.equals(other.rightType); + } else { + return false; + } + } + + @Override + public int hashCode() { + return 17 * leftType.hashCode() + rightType.hashCode(); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof EitherTypeInfo; + } + +} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java new file mode 100644 index 0000000000000000000000000000000000000000..cfd1b5bd3e199648518d75a57acb3c5af9b5aa1d --- /dev/null +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializer.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import java.io.IOException; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.Either; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +/** + * A {@link TypeSerializer} for the {@ link Either} type of the Java class. + * + * @param the Left value type + * @param the Right value type + */ +public class EitherSerializer extends TypeSerializer> { + + private static final long serialVersionUID = 1L; + + private final TypeSerializer leftSerializer; + + private final TypeSerializer rightSerializer; + + public EitherSerializer(TypeSerializer leftSerializer, TypeSerializer rightSerializer) { + this.leftSerializer = leftSerializer; + this.rightSerializer = rightSerializer; + } + + @Override + public boolean isImmutableType() { + return leftSerializer.isImmutableType() && rightSerializer.isImmutableType(); + } + + @Override + public TypeSerializer> duplicate() { + TypeSerializer duplicateLeft = leftSerializer.duplicate(); + TypeSerializer duplicateRight = rightSerializer.duplicate(); + + if ((leftSerializer != duplicateLeft) || (rightSerializer != duplicateRight)) { + // stateful + return new EitherSerializer(duplicateLeft, duplicateRight); + } + else { + return this; + } + } + + + @Override + public Either createInstance() { + // We arbitrarily always create a Right value instance. + return Either.right(rightSerializer.createInstance()); + } + + @Override + public Either copy(Either from) { + if (from.isLeft()) { + L left = from.left(); + L copyLeft = leftSerializer.copy(left); + return Either.left(copyLeft); + } + else { + R right = from.right(); + R copyRight = rightSerializer.copy(right); + return Either.right(copyRight); + } + } + + @Override + public Either copy(Either from, Either reuse) { + if (from.isRight()) { + final R right = from.right(); + if (reuse.isRight()) { + R copyRight = rightSerializer.copy(right, reuse.right()); + return Either.right(copyRight); + } + else { + // if the reuse record isn't a right value, we cannot reuse + R copyRight = rightSerializer.copy(right); + return Either.right(copyRight); + } + } + else { + L left = from.left(); + // reuse record is never a left value because we always create a right instance + L copyLeft = leftSerializer.copy(left); + return Either.left(copyLeft); + } + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(Either record, DataOutputView target) throws IOException { + if (record.isLeft()) { + target.writeBoolean(true); + leftSerializer.serialize(record.left(), target); + } + else { + target.writeBoolean(false); + rightSerializer.serialize(record.right(), target); + } + } + + @Override + public Either deserialize(DataInputView source) throws IOException { + boolean isLeft = source.readBoolean(); + if (isLeft) { + return Either.left(leftSerializer.deserialize(source)); + } + else { + return Either.right(rightSerializer.deserialize(source)); + } + } + + @Override + public Either deserialize(Either reuse, DataInputView source) throws IOException { + boolean isLeft = source.readBoolean(); + if (!isLeft) { + if (reuse.isRight()) { + return Either.right(rightSerializer.deserialize(reuse.right(), source)); + } + else { + // if the reuse record isn't a right value, we cannot reuse + return Either.right(rightSerializer.deserialize(source)); + } + } + else { + // reuse record is never a left value because we always create a right instance + return Either.left(leftSerializer.deserialize(source)); + } + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + boolean isLeft = source.readBoolean(); + target.writeBoolean(isLeft); + if (isLeft) { + leftSerializer.copy(source, target); + } + else { + rightSerializer.copy(source, target); + } + } + + @SuppressWarnings("unchecked") + @Override + public boolean equals(Object obj) { + if (obj instanceof EitherSerializer) { + EitherSerializer other = (EitherSerializer) obj; + + return other.canEqual(this) && + leftSerializer.equals(other.leftSerializer) && + rightSerializer.equals(other.rightSerializer); + } else { + return false; + } + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof EitherSerializer; + } + + @Override + public int hashCode() { + return 17 * leftSerializer.hashCode() + rightSerializer.hashCode(); + } +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/EitherTypeInfoTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/EitherTypeInfoTest.java new file mode 100644 index 0000000000000000000000000000000000000000..b25513602da79a10f57ff6ea5a2e5135758c7af5 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/EitherTypeInfoTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.TestLogger; +import org.junit.Test; + +import static org.junit.Assert.*; + +public class EitherTypeInfoTest extends TestLogger { + + Either intEither = Either.left(1); + Either stringEither = Either.right("boo"); + Either> tuple2Either = Either.right(new Tuple2(42.0, 2l)); + + @Test + public void testEitherTypeEquality() { + EitherTypeInfo eitherInfo1 = new EitherTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + EitherTypeInfo eitherInfo2 = new EitherTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + assertEquals(eitherInfo1, eitherInfo2); + assertEquals(eitherInfo1.hashCode(), eitherInfo2.hashCode()); + } + + @Test + public void testEitherTypeInEquality() { + EitherTypeInfo eitherInfo1 = new EitherTypeInfo( + BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + EitherTypeInfo> eitherInfo2 = new EitherTypeInfo>( + BasicTypeInfo.INT_TYPE_INFO, new TupleTypeInfo>( + TypeExtractor.getForClass(Double.class), TypeExtractor.getForClass(String.class))); + + assertNotEquals(eitherInfo1, eitherInfo2); + assertNotEquals(eitherInfo1.hashCode(), eitherInfo2.hashCode()); + } +} diff --git a/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..198f6416d19b9c9ff261a7d2ca20d850dda3d2c9 --- /dev/null +++ b/flink-java/src/test/java/org/apache/flink/api/java/typeutils/runtime/EitherSerializerTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeutils.SerializerTestInstance; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.Either; +import org.apache.flink.api.java.typeutils.EitherTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.junit.Test; + +public class EitherSerializerTest { + + @SuppressWarnings("unchecked") + @Test + public void testStringDoubleEither() { + + Either[] testData = new Either[] { + Either.left("banana"), + Either.left(""), + Either.right(32.0), + Either.right(Double.MIN_VALUE), + Either.right(Double.MAX_VALUE)}; + + EitherTypeInfo eitherTypeInfo = (EitherTypeInfo) new EitherTypeInfo( + BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.DOUBLE_TYPE_INFO); + EitherSerializer eitherSerializer = + (EitherSerializer) eitherTypeInfo.createSerializer(new ExecutionConfig()); + SerializerTestInstance> testInstance = + new EitherSerializerTestInstance>(eitherSerializer, eitherTypeInfo.getTypeClass(), -1, testData); + testInstance.testAll(); + } + + @SuppressWarnings("unchecked") + @Test + public void testEitherWithTuple() { + + Either, Double>[] testData = new Either[] { + Either.left(new Tuple2<>(2l, 9l)), + Either.left(new Tuple2<>(Long.MIN_VALUE, Long.MAX_VALUE)), + Either.right(32.0), + Either.right(Double.MIN_VALUE), + Either.right(Double.MAX_VALUE)}; + + EitherTypeInfo, Double> eitherTypeInfo = (EitherTypeInfo, Double>) + new EitherTypeInfo, Double>( + new TupleTypeInfo>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO), + BasicTypeInfo.DOUBLE_TYPE_INFO); + EitherSerializer, Double> eitherSerializer = + (EitherSerializer, Double>) eitherTypeInfo.createSerializer(new ExecutionConfig()); + SerializerTestInstance, Double>> testInstance = + new EitherSerializerTestInstance, Double>>( + eitherSerializer, eitherTypeInfo.getTypeClass(), -1, testData); + testInstance.testAll(); + } + + /** + * {@link org.apache.flink.api.common.typeutils.SerializerTestBase#testInstantiate()} + * checks that the type of the created instance is the same as the type class parameter. + * Since we arbitrarily create always create a Left instance we override this test. + */ + private class EitherSerializerTestInstance extends SerializerTestInstance { + + public EitherSerializerTestInstance(TypeSerializer serializer, + Class typeClass, int length, T[] testData) { + super(serializer, typeClass, length, testData); + } + + @Override + @Test + public void testInstantiate() { + try { + TypeSerializer serializer = getSerializer(); + + T instance = serializer.createInstance(); + assertNotNull("The created instance must not be null.", instance); + + Class type = getTypeClass(); + assertNotNull("The test is corrupt: type class is null.", type); + } + catch (Exception e) { + System.err.println(e.getMessage()); + e.printStackTrace(); + fail("Exception in test: " + e.getMessage()); + } + } + + } +}