提交 b54499b1 编写于 作者: R Robert Metzger

[FLINK-3093] Introduce annotations for interface stability in remaining modules

This closes #1428
上级 2eb2a0ef
......@@ -29,6 +29,7 @@ import java.lang.annotation.Target;
* An experimental API might change between minor releases.
*/
@Documented
@Target({ ElementType.TYPE, ElementType.METHOD })
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR })
@Public
public @interface Experimental {
}
......@@ -28,6 +28,7 @@ import java.lang.annotation.Target;
* Developer APIs are stable but internal to Flink and might change across releases.
*/
@Documented
@Target({ ElementType.TYPE, ElementType.METHOD })
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.CONSTRUCTOR })
@Public
public @interface Internal {
}
......@@ -32,4 +32,5 @@ import java.lang.annotation.Target;
*/
@Documented
@Target(ElementType.TYPE)
@Public
public @interface Public {}
......@@ -38,7 +38,6 @@ import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.InstantiationUtil;
public class AvroInputFormat<E> extends FileInputFormat<E> implements ResultTypeQueryable<E> {
private static final long serialVersionUID = 1L;
......
......@@ -15,11 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.io;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
......
......@@ -23,6 +23,7 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -42,6 +43,7 @@ import org.apache.hadoop.mapred.Reporter;
* This wrapper maps a Hadoop Mapper (mapred API) to a Flink FlatMapFunction.
*/
@SuppressWarnings("rawtypes")
@Public
public final class HadoopMapFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichFlatMapFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYOUT,VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
......
......@@ -23,6 +23,7 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -45,6 +46,7 @@ import org.apache.hadoop.mapred.Reporter;
* This wrapper maps a Hadoop Reducer and Combiner (mapred API) to a combinable Flink GroupReduceFunction.
*/
@SuppressWarnings("rawtypes")
@Public
public final class HadoopReduceCombineFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
implements GroupCombineFunction<Tuple2<KEYIN,VALUEIN>, Tuple2<KEYIN,VALUEIN>>,
......
......@@ -23,6 +23,7 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
......@@ -44,6 +45,7 @@ import org.apache.hadoop.mapred.Reporter;
* This wrapper maps a Hadoop Reducer (mapred API) to a non-combinable Flink GroupReduceFunction.
*/
@SuppressWarnings("rawtypes")
@Public
public final class HadoopReduceFunction<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
extends RichGroupReduceFunction<Tuple2<KEYIN,VALUEIN>,Tuple2<KEYOUT,VALUEOUT>>
implements ResultTypeQueryable<Tuple2<KEYOUT,VALUEOUT>>, Serializable {
......
......@@ -15,8 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.addons.hbase;
import java.io.IOException;
......
......@@ -239,6 +239,7 @@ public interface RuntimeContext {
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@Experimental
<T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties);
/**
......@@ -282,6 +283,7 @@ public interface RuntimeContext {
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part os a KeyedStream).
*/
@Experimental
<T> ListState<T> getListState(ListStateDescriptor<T> stateProperties);
/**
......@@ -321,6 +323,7 @@ public interface RuntimeContext {
* @throws UnsupportedOperationException Thrown, if no partitioned state is available for the
* function (function is not part of a KeyedStream).
*/
@Experimental
<T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties);
/**
......@@ -380,6 +383,7 @@ public interface RuntimeContext {
* @deprecated Use the more expressive {@link #getState(ValueStateDescriptor)} instead.
*/
@Deprecated
@Experimental
<S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);
/**
......@@ -432,5 +436,6 @@ public interface RuntimeContext {
* @deprecated Use the more expressive {@link #getState(ValueStateDescriptor)} instead.
*/
@Deprecated
@Experimental
<S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
}
......@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.Future;
import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
......@@ -175,18 +176,21 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
}
@Override
@Experimental
public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}
@Override
@Experimental
public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
}
@Override
@Experimental
public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
......@@ -194,6 +198,7 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
@Override
@Deprecated
@Experimental
public <S> OperatorState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
......@@ -201,6 +206,7 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
@Override
@Deprecated
@Experimental
public <S> OperatorState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
throw new UnsupportedOperationException(
"This state is only accessible by functions executed on a KeyedStream");
......
......@@ -23,7 +23,6 @@ import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import org.apache.flink.annotation.Public;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.parser.FieldParser;
......@@ -41,7 +40,6 @@ import java.util.ArrayList;
import java.util.Map;
import java.util.TreeMap;
@Public
public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT> {
private static final Logger LOG = LoggerFactory.getLogger(GenericCsvInputFormat.class);
......
......@@ -19,6 +19,7 @@
package org.apache.flink.api.common.io.statistics;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
/**
......@@ -30,16 +31,19 @@ public interface BaseStatistics {
/**
* Constant indicating that the input size is unknown.
*/
@Experimental
public static final long SIZE_UNKNOWN = -1;
/**
* Constant indicating that the number of records is unknown;
*/
@Experimental
public static final long NUM_RECORDS_UNKNOWN = -1;
/**
* Constant indicating that average record width is unknown.
*/
@Experimental
public static final float AVG_RECORD_BYTES_UNKNOWN = -1.0f;
// --------------------------------------------------------------------------------------------
......@@ -49,6 +53,7 @@ public interface BaseStatistics {
*
* @return The total size of the input, in bytes.
*/
@Experimental
public long getTotalInputSize();
/**
......@@ -56,6 +61,7 @@ public interface BaseStatistics {
*
* @return The number of records in the input.
*/
@Experimental
public long getNumberOfRecords();
/**
......@@ -63,5 +69,6 @@ public interface BaseStatistics {
*
* @return The average width of a record in bytes.
*/
@Experimental
public float getAverageRecordWidth();
}
......@@ -21,13 +21,11 @@ package org.apache.flink.api.common.operators;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.operators.util.FieldSet;
/**
* Container for the semantic properties associated to a single input operator.
*/
@Public
public class SingleInputSemanticProperties implements SemanticProperties {
private static final long serialVersionUID = 1L;
......
......@@ -18,6 +18,7 @@
package org.apache.flink.api.common.typeinfo;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeComparator;
......@@ -29,6 +30,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
* In contrast to atomic types are composite types, where the type information is aware of the individual
* fields and individual fields may be used as a key.
*/
@Public
public interface AtomicType<T> {
/**
......
......@@ -23,12 +23,15 @@ import java.util.Map;
import java.util.Objects;
import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.StringArraySerializer;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
@Public
public final class BasicArrayTypeInfo<T, C> extends TypeInformation<T> {
private static final long serialVersionUID = 1L;
......@@ -58,45 +61,54 @@ public final class BasicArrayTypeInfo<T, C> extends TypeInformation<T> {
// --------------------------------------------------------------------------------------------
@Override
@Experimental
public boolean isBasicType() {
return false;
}
@Override
@Experimental
public boolean isTupleType() {
return false;
}
@Override
@Experimental
public int getArity() {
return 1;
}
@Override
@Experimental
public int getTotalFields() {
return 1;
}
@Override
@Experimental
public Class<T> getTypeClass() {
return this.arrayClass;
}
@Experimental
public Class<C> getComponentTypeClass() {
return this.componentInfo.getTypeClass();
}
@Experimental
public TypeInformation<C> getComponentInfo() {
return componentInfo;
}
@Override
@Experimental
public boolean isKeyType() {
return false;
}
@Override
@SuppressWarnings("unchecked")
@Experimental
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
// special case the string array
if (componentInfo.getTypeClass().equals(String.class)) {
......@@ -140,6 +152,7 @@ public final class BasicArrayTypeInfo<T, C> extends TypeInformation<T> {
// --------------------------------------------------------------------------------------------
@SuppressWarnings("unchecked")
@Experimental
public static <X, C> BasicArrayTypeInfo<X, C> getInfoFor(Class<X> type) {
if (!type.isArray()) {
throw new InvalidTypesException("The given class is no array.");
......
......@@ -26,6 +26,8 @@ import java.util.Map;
import java.util.Objects;
import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeutils.TypeComparator;
......@@ -56,6 +58,7 @@ import org.apache.flink.api.common.typeutils.base.VoidSerializer;
/**
* Type information for primitive types (int, long, double, byte, ...), String, Date, and Void.
*/
@Public
public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
private static final long serialVersionUID = -430955220409131770L;
......@@ -97,6 +100,7 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
* Returns whether this type should be automatically casted to
* the target type in an arithmetic operation.
*/
@Experimental
public boolean shouldAutocastTo(BasicTypeInfo<?> to) {
for (Class<?> possibleTo: possibleCastTargetTypes) {
if (possibleTo.equals(to.getTypeClass())) {
......@@ -107,41 +111,49 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
}
@Override
@Experimental
public boolean isBasicType() {
return true;
}
@Override
@Experimental
public boolean isTupleType() {
return false;
}
@Override
@Experimental
public int getArity() {
return 1;
}
@Override
@Experimental
public int getTotalFields() {
return 1;
}
@Override
@Experimental
public Class<T> getTypeClass() {
return this.clazz;
}
@Override
@Experimental
public boolean isKeyType() {
return true;
}
@Override
@Experimental
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
return this.serializer;
}
@Override
@Experimental
public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
if (comparatorClass != null) {
return instantiateComparator(comparatorClass, sortOrderAscending);
......@@ -183,7 +195,8 @@ public class BasicTypeInfo<T> extends TypeInformation<T> implements AtomicType<T
}
// --------------------------------------------------------------------------------------------
@Experimental
public static <X> BasicTypeInfo<X> getInfoFor(Class<X> type) {
if (type == null) {
throw new NullPointerException();
......
......@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeinfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
......@@ -28,6 +29,7 @@ import java.util.Set;
/**
* Type information for numeric fractional primitive types (double, float).
*/
@Public
public class FractionalTypeInfo<T> extends NumericTypeInfo<T> {
private static final long serialVersionUID = 554334260950199994L;
......
......@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeinfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
......@@ -28,6 +29,7 @@ import java.util.Set;
/**
* Type information for numeric integer primitive types: int, long, byte, short, character.
*/
@Public
public class IntegerTypeInfo<T> extends NumericTypeInfo<T> {
private static final long serialVersionUID = -8068827354966766955L;
......
......@@ -18,6 +18,8 @@
package org.apache.flink.api.common.typeinfo;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.types.Nothing;
......@@ -25,41 +27,49 @@ import org.apache.flink.types.Nothing;
/**
* Placeholder type information for the {@link Nothing} type.
*/
@Public
public class NothingTypeInfo extends TypeInformation<Nothing> {
private static final long serialVersionUID = 1L;
@Override
@Experimental
public boolean isBasicType() {
return false;
}
@Override
@Experimental
public boolean isTupleType() {
return false;
}
@Override
@Experimental
public int getArity() {
return 0;
}
@Override
@Experimental
public int getTotalFields() {
return 0;
}
@Override
@Experimental
public Class<Nothing> getTypeClass() {
return Nothing.class;
}
@Override
@Experimental
public boolean isKeyType() {
return false;
}
@Override
@Experimental
public TypeSerializer<Nothing> createSerializer(ExecutionConfig executionConfig) {
throw new RuntimeException("The Nothing type cannot have a serializer.");
}
......
......@@ -20,6 +20,7 @@ package org.apache.flink.api.common.typeinfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
......@@ -28,6 +29,7 @@ import java.util.Set;
/**
* Type information for numeric primitive types: int, long, double, byte, short, float, char.
*/
@Public
public abstract class NumericTypeInfo<T> extends BasicTypeInfo<T> {
private static final long serialVersionUID = -5937777910658986986L;
......
......@@ -23,6 +23,8 @@ import java.util.Map;
import java.util.Objects;
import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
......@@ -50,6 +52,7 @@ import org.apache.flink.api.common.typeutils.base.array.ShortPrimitiveArraySeria
*
* @param <T> The type represented by this type information, e.g., int[], double[], long[]
*/
@Public
public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
private static final long serialVersionUID = 1L;
......@@ -92,36 +95,43 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> implements Ato
// --------------------------------------------------------------------------------------------
@Override
@Experimental
public boolean isBasicType() {
return false;
}
@Override
@Experimental
public boolean isTupleType() {
return false;
}
@Override
@Experimental
public int getArity() {
return 1;
}
@Override
@Experimental
public int getTotalFields() {
return 1;
}
@Override
@Experimental
public Class<T> getTypeClass() {
return this.arrayClass;
}
@Override
@Experimental
public boolean isKeyType() {
return true;
}
@Override
@Experimental
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
return this.serializer;
}
......@@ -130,6 +140,7 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> implements Ato
* Gets the class that represents the component type.
* @return The class of the component type.
*/
@Experimental
public Class<?> getComponentClass() {
return this.arrayClass.getComponentType();
}
......@@ -138,6 +149,7 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> implements Ato
* Gets the type information of the component type.
* @return The type information of the component type.
*/
@Experimental
public TypeInformation<?> getComponentType() {
return BasicTypeInfo.getInfoFor(getComponentClass());
}
......@@ -183,6 +195,7 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> implements Ato
* @throws InvalidTypesException Thrown, if the given class does not represent an array.
*/
@SuppressWarnings("unchecked")
@Experimental
public static <X> PrimitiveArrayTypeInfo<X> getInfoFor(Class<X> type) {
if (!type.isArray()) {
throw new InvalidTypesException("The given class is no array.");
......@@ -208,6 +221,7 @@ public class PrimitiveArrayTypeInfo<T> extends TypeInformation<T> implements Ato
}
@Override
@Experimental
public PrimitiveArrayComparator<T, ?> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
try {
return comparatorClass.getConstructor(boolean.class).newInstance(sortOrderAscending);
......
......@@ -18,6 +18,7 @@
package org.apache.flink.api.common.typeinfo;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
......@@ -80,6 +81,7 @@ public abstract class TypeInformation<T> implements Serializable {
*
* @return True, if this type information describes a basic type, false otherwise.
*/
@Experimental
public abstract boolean isBasicType();
/**
......@@ -88,6 +90,7 @@ public abstract class TypeInformation<T> implements Serializable {
*
* @return True, if this type information describes a tuple type, false otherwise.
*/
@Experimental
public abstract boolean isTupleType();
/**
......@@ -95,6 +98,7 @@ public abstract class TypeInformation<T> implements Serializable {
*
* @return Gets the number of fields in this type without nesting.
*/
@Experimental
public abstract int getArity();
/**
......@@ -105,6 +109,7 @@ public abstract class TypeInformation<T> implements Serializable {
*
* @return The number of fields in this type, including its sub-fields (for composite types)
*/
@Experimental
public abstract int getTotalFields();
/**
......@@ -112,6 +117,7 @@ public abstract class TypeInformation<T> implements Serializable {
*
* @return The class of the type represented by this type information.
*/
@Experimental
public abstract Class<T> getTypeClass();
/**
......@@ -119,6 +125,7 @@ public abstract class TypeInformation<T> implements Serializable {
*
* @return The list of generic parameters. This list can be empty.
*/
@Experimental
public List<TypeInformation<?>> getGenericParameters() {
// Return an empty list as the default implementation
return new LinkedList<>();
......@@ -130,12 +137,14 @@ public abstract class TypeInformation<T> implements Serializable {
*
* @return True, if the type can be used as a key, false otherwise.
*/
@Experimental
public abstract boolean isKeyType();
/**
* Checks whether this type can be used as a key for sorting.
* The order produced by sorting this type must be meaningful.
*/
@Experimental
public boolean isSortKeyType() {
return isKeyType();
}
......@@ -147,6 +156,7 @@ public abstract class TypeInformation<T> implements Serializable {
* @param config The config used to parameterize the serializer.
* @return A serializer for this type.
*/
@Experimental
public abstract TypeSerializer<T> createSerializer(ExecutionConfig config);
@Override
......
......@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Objects;
import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -33,12 +35,14 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
*
* The class is taking care of serialization and comparators for Tuples as well.
*/
@Public
public abstract class CompositeType<T> extends TypeInformation<T> {
private static final long serialVersionUID = 1L;
private final Class<T> typeClass;
@Experimental
public CompositeType(Class<T> typeClass) {
this.typeClass = Preconditions.checkNotNull(typeClass);
}
......@@ -48,6 +52,7 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
*
* @return Type class of the composite type
*/
@Experimental
public Class<T> getTypeClass() {
return typeClass;
}
......@@ -58,6 +63,7 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
* @param fieldExpression The field expression for which the flat field descriptors are computed.
* @return The list of descriptors for the flat fields which are specified by the field expression.
*/
@Experimental
public List<FlatFieldDescriptor> getFlatFields(String fieldExpression) {
List<FlatFieldDescriptor> result = new ArrayList<FlatFieldDescriptor>();
this.getFlatFields(fieldExpression, 0, result);
......@@ -71,6 +77,7 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
* @param offset The offset to use when computing the positions of the flat fields.
* @param result The list into which all flat field descriptors are inserted.
*/
@Experimental
public abstract void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result);
/**
......@@ -80,6 +87,7 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
* @param fieldExpression The field expression for which the field of which the type is returned.
* @return The type of the field at the given field expression.
*/
@Experimental
public abstract <X> TypeInformation<X> getTypeAt(String fieldExpression);
/**
......@@ -88,8 +96,10 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
* @param pos The position of the (unnested) field in this composite type.
* @return The type of the field at the given position.
*/
@Experimental
public abstract <X> TypeInformation<X> getTypeAt(int pos);
@Experimental
protected abstract TypeComparatorBuilder<T> createTypeComparatorBuilder();
/**
......@@ -97,6 +107,7 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
* to create the actual comparators
* @return The comparator
*/
@Experimental
public TypeComparator<T> createComparator(int[] logicalKeyFields, boolean[] orders, int logicalFieldOffset, ExecutionConfig config) {
TypeComparatorBuilder<T> builder = createTypeComparatorBuilder();
......@@ -158,6 +169,7 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
// --------------------------------------------------------------------------------------------
@Experimental
protected interface TypeComparatorBuilder<T> {
void initializeTypeComparatorBuilder(int size);
......@@ -166,6 +178,7 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
TypeComparator<T> createTypeComparator(ExecutionConfig config);
}
@Experimental
public static class FlatFieldDescriptor {
private int keyPosition;
private TypeInformation<?> type;
......@@ -196,11 +209,13 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
/**
* Returns true when this type has a composite field with the given name.
*/
@Experimental
public boolean hasField(String fieldName) {
return getFieldIndex(fieldName) >= 0;
}
@Override
@Experimental
public boolean isKeyType() {
for(int i=0;i<this.getArity();i++) {
if (!this.getTypeAt(i).isKeyType()) {
......@@ -211,6 +226,7 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
}
@Override
@Experimental
public boolean isSortKeyType() {
for(int i=0;i<this.getArity();i++) {
if (!this.getTypeAt(i).isSortKeyType()) {
......@@ -224,6 +240,7 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
* Returns the names of the composite fields of this type. The order of the returned array must
* be consistent with the internal field index ordering.
*/
@Experimental
public abstract String[] getFieldNames();
/**
......@@ -235,16 +252,20 @@ public abstract class CompositeType<T> extends TypeInformation<T> {
* This is used when translating a DataSet or DataStream to an Expression Table, when
* initially renaming the fields of the underlying type.
*/
@Experimental
public boolean hasDeterministicFieldOrder() {
return false;
}
/**
* Returns the field index of the composite field of the given name.
*
* @return The field index or -1 if this type does not have a field of the given name.
*/
@Experimental
public abstract int getFieldIndex(String fieldName);
@Experimental
public static class InvalidFieldReferenceException extends IllegalArgumentException {
private static final long serialVersionUID = 1L;
......
......@@ -18,6 +18,7 @@
package org.apache.flink.api.java.functions;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.Function;
import java.io.Serializable;
......@@ -31,6 +32,7 @@ import java.io.Serializable;
* @param <IN> Type of objects to extract the key from.
* @param <KEY> Type of key.
*/
@Public
public interface KeySelector<IN, KEY> extends Function, Serializable {
/**
......
......@@ -18,6 +18,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.types.NullFieldException;
......@@ -33,6 +34,7 @@ import org.apache.flink.types.NullFieldException;
* Tuples are in principle serializable. However, they may contain non-serializable fields,
* in which case serialization will fail.
*/
@Public
public abstract class Tuple implements java.io.Serializable {
private static final long serialVersionUID = 1L;
......
......@@ -17,6 +17,8 @@
*/
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import java.io.ObjectStreamException;
/**
......@@ -27,6 +29,7 @@ import java.io.ObjectStreamException;
*
* @see Tuple
*/
@Public
public class Tuple0 extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -39,6 +40,7 @@ import org.apache.flink.util.StringUtils;
*
* @param <T0> The type of field 0
*/
@Public
public class Tuple1<T0> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -48,6 +49,7 @@ import org.apache.flink.util.StringUtils;
* @param <T8> The type of field 8
* @param <T9> The type of field 9
*/
@Public
public class Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -49,6 +50,7 @@ import org.apache.flink.util.StringUtils;
* @param <T9> The type of field 9
* @param <T10> The type of field 10
*/
@Public
public class Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -50,6 +51,7 @@ import org.apache.flink.util.StringUtils;
* @param <T10> The type of field 10
* @param <T11> The type of field 11
*/
@Public
public class Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -51,6 +52,7 @@ import org.apache.flink.util.StringUtils;
* @param <T11> The type of field 11
* @param <T12> The type of field 12
*/
@Public
public class Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -52,6 +53,7 @@ import org.apache.flink.util.StringUtils;
* @param <T12> The type of field 12
* @param <T13> The type of field 13
*/
@Public
public class Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -53,6 +54,7 @@ import org.apache.flink.util.StringUtils;
* @param <T13> The type of field 13
* @param <T14> The type of field 14
*/
@Public
public class Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -54,6 +55,7 @@ import org.apache.flink.util.StringUtils;
* @param <T14> The type of field 14
* @param <T15> The type of field 15
*/
@Public
public class Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -55,6 +56,7 @@ import org.apache.flink.util.StringUtils;
* @param <T15> The type of field 15
* @param <T16> The type of field 16
*/
@Public
public class Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -56,6 +57,7 @@ import org.apache.flink.util.StringUtils;
* @param <T16> The type of field 16
* @param <T17> The type of field 17
*/
@Public
public class Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -57,6 +58,7 @@ import org.apache.flink.util.StringUtils;
* @param <T17> The type of field 17
* @param <T18> The type of field 18
*/
@Public
public class Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -40,6 +41,7 @@ import org.apache.flink.util.StringUtils;
* @param <T0> The type of field 0
* @param <T1> The type of field 1
*/
@Public
public class Tuple2<T0, T1> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -58,6 +59,7 @@ import org.apache.flink.util.StringUtils;
* @param <T18> The type of field 18
* @param <T19> The type of field 19
*/
@Public
public class Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -59,6 +60,7 @@ import org.apache.flink.util.StringUtils;
* @param <T19> The type of field 19
* @param <T20> The type of field 20
*/
@Public
public class Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -60,6 +61,7 @@ import org.apache.flink.util.StringUtils;
* @param <T20> The type of field 20
* @param <T21> The type of field 21
*/
@Public
public class Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -61,6 +62,7 @@ import org.apache.flink.util.StringUtils;
* @param <T21> The type of field 21
* @param <T22> The type of field 22
*/
@Public
public class Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -62,6 +63,7 @@ import org.apache.flink.util.StringUtils;
* @param <T22> The type of field 22
* @param <T23> The type of field 23
*/
@Public
public class Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -63,6 +64,7 @@ import org.apache.flink.util.StringUtils;
* @param <T23> The type of field 23
* @param <T24> The type of field 24
*/
@Public
public class Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -41,6 +42,7 @@ import org.apache.flink.util.StringUtils;
* @param <T1> The type of field 1
* @param <T2> The type of field 2
*/
@Public
public class Tuple3<T0, T1, T2> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -42,6 +43,7 @@ import org.apache.flink.util.StringUtils;
* @param <T2> The type of field 2
* @param <T3> The type of field 3
*/
@Public
public class Tuple4<T0, T1, T2, T3> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -43,6 +44,7 @@ import org.apache.flink.util.StringUtils;
* @param <T3> The type of field 3
* @param <T4> The type of field 4
*/
@Public
public class Tuple5<T0, T1, T2, T3, T4> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -44,6 +45,7 @@ import org.apache.flink.util.StringUtils;
* @param <T4> The type of field 4
* @param <T5> The type of field 5
*/
@Public
public class Tuple6<T0, T1, T2, T3, T4, T5> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -45,6 +46,7 @@ import org.apache.flink.util.StringUtils;
* @param <T5> The type of field 5
* @param <T6> The type of field 6
*/
@Public
public class Tuple7<T0, T1, T2, T3, T4, T5, T6> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -46,6 +47,7 @@ import org.apache.flink.util.StringUtils;
* @param <T6> The type of field 6
* @param <T7> The type of field 7
*/
@Public
public class Tuple8<T0, T1, T2, T3, T4, T5, T6, T7> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -25,6 +25,7 @@
package org.apache.flink.api.java.tuple;
import org.apache.flink.annotation.Public;
import org.apache.flink.util.StringUtils;
/**
......@@ -47,6 +48,7 @@ import org.apache.flink.util.StringUtils;
* @param <T7> The type of field 7
* @param <T8> The type of field 8
*/
@Public
public class Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8> extends Tuple {
private static final long serialVersionUID = 1L;
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple0;
@Public
public class Tuple0Builder {
private List<Tuple0> tuples = new ArrayList<Tuple0>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple10;
@Public
public class Tuple10Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> {
private List<Tuple10<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple11;
@Public
public class Tuple11Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> {
private List<Tuple11<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple12;
@Public
public class Tuple12Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11> {
private List<Tuple12<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple13;
@Public
public class Tuple13Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12> {
private List<Tuple13<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple14;
@Public
public class Tuple14Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13> {
private List<Tuple14<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple15;
@Public
public class Tuple15Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14> {
private List<Tuple15<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple16;
@Public
public class Tuple16Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15> {
private List<Tuple16<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple17;
@Public
public class Tuple17Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16> {
private List<Tuple17<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple18;
@Public
public class Tuple18Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17> {
private List<Tuple18<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple19;
@Public
public class Tuple19Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18> {
private List<Tuple19<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple1;
@Public
public class Tuple1Builder<T0> {
private List<Tuple1<T0>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple20;
@Public
public class Tuple20Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19> {
private List<Tuple20<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple21;
@Public
public class Tuple21Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20> {
private List<Tuple21<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple22;
@Public
public class Tuple22Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21> {
private List<Tuple22<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple23;
@Public
public class Tuple23Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22> {
private List<Tuple23<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple24;
@Public
public class Tuple24Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23> {
private List<Tuple24<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple25;
@Public
public class Tuple25Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24> {
private List<Tuple25<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, T23, T24>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple2;
@Public
public class Tuple2Builder<T0, T1> {
private List<Tuple2<T0, T1>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple3;
@Public
public class Tuple3Builder<T0, T1, T2> {
private List<Tuple3<T0, T1, T2>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple4;
@Public
public class Tuple4Builder<T0, T1, T2, T3> {
private List<Tuple4<T0, T1, T2, T3>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple5;
@Public
public class Tuple5Builder<T0, T1, T2, T3, T4> {
private List<Tuple5<T0, T1, T2, T3, T4>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple6;
@Public
public class Tuple6Builder<T0, T1, T2, T3, T4, T5> {
private List<Tuple6<T0, T1, T2, T3, T4, T5>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple7;
@Public
public class Tuple7Builder<T0, T1, T2, T3, T4, T5, T6> {
private List<Tuple7<T0, T1, T2, T3, T4, T5, T6>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple8;
@Public
public class Tuple8Builder<T0, T1, T2, T3, T4, T5, T6, T7> {
private List<Tuple8<T0, T1, T2, T3, T4, T5, T6, T7>> tuples = new ArrayList<>();
......
......@@ -28,8 +28,10 @@ package org.apache.flink.api.java.tuple.builder;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.java.tuple.Tuple9;
@Public
public class Tuple9Builder<T0, T1, T2, T3, T4, T5, T6, T7, T8> {
private List<Tuple9<T0, T1, T2, T3, T4, T5, T6, T7, T8>> tuples = new ArrayList<>();
......
......@@ -20,6 +20,8 @@
package org.apache.flink.api.java.typeutils;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import java.lang.reflect.Type;
......@@ -39,7 +41,9 @@ import java.util.List;
* This class is checked by the AvroPojoTest.
* @param <T>
*/
@Public
public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {
@Experimental
public AvroTypeInfo(Class<T> typeClass) {
super(typeClass, generateFieldsFromAvroSchema(typeClass));
}
......
......@@ -18,6 +18,8 @@
package org.apache.flink.api.java.typeutils;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
......@@ -30,6 +32,7 @@ import org.apache.flink.types.Either;
* @param <L> the Left value type
* @param <R> the Right value type
*/
@Public
public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {
private static final long serialVersionUID = 1L;
......@@ -38,43 +41,51 @@ public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {
private final TypeInformation<R> rightType;
@Experimental
public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> rightType) {
this.leftType = leftType;
this.rightType = rightType;
}
@Override
@Experimental
public boolean isBasicType() {
return false;
}
@Override
@Experimental
public boolean isTupleType() {
return false;
}
@Override
@Experimental
public int getArity() {
return 1;
}
@Override
@Experimental
public int getTotalFields() {
return 1;
}
@SuppressWarnings("unchecked")
@Override
@Experimental
public Class<Either<L, R>> getTypeClass() {
return (Class<Either<L, R>>) (Class<?>) Either.class;
}
@Override
@Experimental
public boolean isKeyType() {
return false;
}
@Override
@Experimental
public TypeSerializer<Either<L, R>> createSerializer(ExecutionConfig config) {
return new EitherSerializer<L, R>(leftType.createSerializer(config),
rightType.createSerializer(config));
......
......@@ -19,6 +19,7 @@
package org.apache.flink.api.java.typeutils;
import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -26,18 +27,21 @@ import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.EnumComparator;
import org.apache.flink.api.common.typeutils.base.EnumSerializer;
import org.apache.flink.annotation.Public;
/**
* A {@link TypeInformation} for java enumeration types.
*
* @param <T> The type represented by this type information.
*/
@Public
public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> implements AtomicType<T> {
private static final long serialVersionUID = 8936740290137178660L;
private final Class<T> typeClass;
@Experimental
public EnumTypeInfo(Class<T> typeClass) {
Preconditions.checkNotNull(typeClass, "Enum type class must not be null.");
......@@ -49,41 +53,49 @@ public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> implemen
}
@Override
@Experimental
public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
return new EnumComparator<T>(sortOrderAscending);
}
@Override
@Experimental
public boolean isBasicType() {
return false;
}
@Override
@Experimental
public boolean isTupleType() {
return false;
}
@Override
@Experimental
public int getArity() {
return 1;
}
@Override
@Experimental
public int getTotalFields() {
return 1;
}
@Override
@Experimental
public Class<T> getTypeClass() {
return this.typeClass;
}
@Override
@Experimental
public boolean isKeyType() {
return true;
}
@Override
@Experimental
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
return new EnumSerializer<T>(typeClass);
}
......
......@@ -19,6 +19,8 @@
package org.apache.flink.api.java.typeutils;
import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -28,53 +30,63 @@ import org.apache.flink.api.java.typeutils.runtime.GenericTypeComparator;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
@Public
public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType<T> {
private static final long serialVersionUID = -7959114120287706504L;
private final Class<T> typeClass;
@Experimental
public GenericTypeInfo(Class<T> typeClass) {
this.typeClass = Preconditions.checkNotNull(typeClass);
}
@Override
@Experimental
public boolean isBasicType() {
return false;
}
@Override
@Experimental
public boolean isTupleType() {
return false;
}
@Override
@Experimental
public int getArity() {
return 1;
}
@Override
@Experimental
public int getTotalFields() {
return 1;
}
@Override
@Experimental
public Class<T> getTypeClass() {
return typeClass;
}
@Override
@Experimental
public boolean isKeyType() {
return Comparable.class.isAssignableFrom(typeClass);
}
@Override
@Experimental
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
return new KryoSerializer<T>(this.typeClass, config);
}
@SuppressWarnings("unchecked")
@Override
@Experimental
public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
if (isKeyType()) {
@SuppressWarnings("rawtypes")
......
......@@ -19,6 +19,7 @@
package org.apache.flink.api.java.typeutils;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -29,6 +30,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
* called when the output format is used with an output method such as
* {@link org.apache.flink.api.java.DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
*/
@Public
public interface InputTypeConfigurable {
/**
......
......@@ -20,12 +20,16 @@ package org.apache.flink.api.java.typeutils;
import java.lang.reflect.Array;
import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
import com.google.common.base.Preconditions;
@Public
public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
private static final long serialVersionUID = 1L;
......@@ -41,42 +45,50 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
// --------------------------------------------------------------------------------------------
@Override
@Experimental
public boolean isBasicType() {
return false;
}
@Override
@Experimental
public boolean isTupleType() {
return false;
}
@Override
@Experimental
public int getArity() {
return 1;
}
@Override
@Experimental
public int getTotalFields() {
return 1;
}
@SuppressWarnings("unchecked")
@Override
@Experimental
public Class<T> getTypeClass() {
return arrayType;
}
@Experimental
public TypeInformation<C> getComponentInfo() {
return componentInfo;
}
@Override
@Experimental
public boolean isKeyType() {
return false;
}
@SuppressWarnings("unchecked")
@Override
@Experimental
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
return (TypeSerializer<T>) new GenericArraySerializer<C>(
componentInfo.getTypeClass(),
......@@ -114,6 +126,7 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
// --------------------------------------------------------------------------------------------
@Experimental
public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Class<T> arrayClass, TypeInformation<C> componentInfo) {
Preconditions.checkNotNull(arrayClass);
Preconditions.checkNotNull(componentInfo);
......@@ -131,6 +144,7 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
* {@link java.lang.reflect.Type} or {@link java.lang.Class}.
*/
@SuppressWarnings("unchecked")
@Experimental
public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(TypeInformation<C> componentInfo) {
Preconditions.checkNotNull(componentInfo);
......
......@@ -29,6 +29,8 @@ import java.util.regex.Pattern;
import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
import org.apache.flink.api.common.typeinfo.TypeInformation;
......@@ -54,6 +56,7 @@ import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
*
* @param <T> The type represented by this type information.
*/
@Public
public class PojoTypeInfo<T> extends CompositeType<T> {
private static final long serialVersionUID = 1L;
......@@ -71,6 +74,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
private final int totalFields;
@Experimental
public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields) {
super(typeClass);
......@@ -96,27 +100,32 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
}
@Override
@Experimental
public boolean isBasicType() {
return false;
}
@Override
@Experimental
public boolean isTupleType() {
return false;
}
@Override
@Experimental
public int getArity() {
return fields.length;
}
@Override
@Experimental
public int getTotalFields() {
return totalFields;
}
@Override
@Experimental
public boolean isSortKeyType() {
// Support for sorting POJOs that implement Comparable is not implemented yet.
// Since the order of fields in a POJO type is not well defined, sorting on fields
......@@ -126,6 +135,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
@Override
@Experimental
public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
......@@ -202,6 +212,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
@SuppressWarnings("unchecked")
@Override
@Experimental
public <X> TypeInformation<X> getTypeAt(String fieldExpression) {
Matcher matcher = PATTERN_NESTED_FIELDS.matcher(fieldExpression);
......@@ -242,6 +253,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
}
@Override
@Experimental
public <X> TypeInformation<X> getTypeAt(int pos) {
if (pos < 0 || pos >= this.fields.length) {
throw new IndexOutOfBoundsException();
......@@ -257,6 +269,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
}
// used for testing. Maybe use mockito here
@Experimental
public PojoField getPojoFieldAt(int pos) {
if (pos < 0 || pos >= this.fields.length) {
throw new IndexOutOfBoundsException();
......@@ -264,6 +277,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
return this.fields[pos];
}
@Experimental
public String[] getFieldNames() {
String[] result = new String[fields.length];
for (int i = 0; i < fields.length; i++) {
......@@ -273,6 +287,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
}
@Override
@Experimental
public int getFieldIndex(String fieldName) {
for (int i = 0; i < fields.length; i++) {
if (fields[i].getField().getName().equals(fieldName)) {
......@@ -283,6 +298,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
}
@Override
@Experimental
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
if(config.isForceKryoEnabled()) {
return new KryoSerializer<T>(getTypeClass(), config);
......
......@@ -18,6 +18,7 @@
package org.apache.flink.api.java.typeutils;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.typeinfo.TypeInformation;
/**
......@@ -26,6 +27,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
* that is otherwise performed and is useful in situations where the produced data type may vary
* depending on parametrization.
*/
@Public
public interface ResultTypeQueryable<T> {
/**
......
......@@ -24,6 +24,8 @@ import java.util.Collections;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
......@@ -43,6 +45,7 @@ import org.apache.flink.types.Value;
*
* @param <T> The type of the tuple.
*/
@Public
public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
private static final long serialVersionUID = 1L;
......@@ -50,10 +53,12 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
protected final String[] fieldNames;
@SuppressWarnings("unchecked")
@Experimental
public TupleTypeInfo(TypeInformation<?>... types) {
this((Class<T>) Tuple.getTupleClass(types.length), types);
}
@Experimental
public TupleTypeInfo(Class<T> tupleType, TypeInformation<?>... types) {
super(tupleType, types);
......@@ -69,11 +74,13 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
}
@Override
@Experimental
public String[] getFieldNames() {
return fieldNames;
}
@Override
@Experimental
public int getFieldIndex(String fieldName) {
int fieldIndex = Integer.parseInt(fieldName.substring(1));
if (fieldIndex >= getArity()) {
......@@ -84,6 +91,7 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
@SuppressWarnings("unchecked")
@Override
@Experimental
public TupleSerializer<T> createSerializer(ExecutionConfig executionConfig) {
if (getTypeClass() == Tuple0.class) {
return (TupleSerializer<T>) Tuple0Serializer.INSTANCE;
......@@ -191,6 +199,7 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
// --------------------------------------------------------------------------------------------
@Experimental
public static <X extends Tuple> TupleTypeInfo<X> getBasicTupleTypeInfo(Class<?>... basicTypes) {
if (basicTypes == null || basicTypes.length == 0) {
throw new IllegalArgumentException();
......@@ -216,6 +225,7 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
}
@SuppressWarnings("unchecked")
@Experimental
public static <X extends Tuple> TupleTypeInfo<X> getBasicAndBasicValueTupleTypeInfo(Class<?>... basicTypes) {
if (basicTypes == null || basicTypes.length == 0) {
throw new IllegalArgumentException();
......
......@@ -32,7 +32,9 @@ import java.util.List;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.commons.lang3.ClassUtils;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
......@@ -71,6 +73,7 @@ import com.google.common.base.Preconditions;
* A utility for reflection analysis on classes, to determine the return type of implementations of transformation
* functions.
*/
@Public
public class TypeExtractor {
/*
......@@ -104,11 +107,13 @@ public class TypeExtractor {
// --------------------------------------------------------------------------------------------
// Function specific methods
// --------------------------------------------------------------------------------------------
@Experimental
public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> inType) {
return getMapReturnTypes(mapInterface, inType, null, false);
}
@Experimental
public static <IN, OUT> TypeInformation<OUT> getMapReturnTypes(MapFunction<IN, OUT> mapInterface, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
......@@ -116,136 +121,151 @@ public class TypeExtractor {
}
@Experimental
public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType) {
return getFlatMapReturnTypes(flatMapInterface, inType, null, false);
}
@Experimental
public static <IN, OUT> TypeInformation<OUT> getFlatMapReturnTypes(FlatMapFunction<IN, OUT> flatMapInterface, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType((Function) flatMapInterface, FlatMapFunction.class, false, true, inType, functionName, allowMissing);
}
@Experimental
public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType)
{
return getFoldReturnTypes(foldInterface, inType, null, false);
}
@Experimental
public static <IN, OUT> TypeInformation<OUT> getFoldReturnTypes(FoldFunction<IN, OUT> foldInterface, TypeInformation<IN> inType, String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType((Function) foldInterface, FoldFunction.class, false, false, inType, functionName, allowMissing);
}
@Experimental
public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, TypeInformation<IN> inType) {
return getMapPartitionReturnTypes(mapPartitionInterface, inType, null, false);
}
@Experimental
public static <IN, OUT> TypeInformation<OUT> getMapPartitionReturnTypes(MapPartitionFunction<IN, OUT> mapPartitionInterface, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType((Function) mapPartitionInterface, MapPartitionFunction.class, true, true, inType, functionName, allowMissing);
}
@Experimental
public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface, TypeInformation<IN> inType) {
return getGroupReduceReturnTypes(groupReduceInterface, inType, null, false);
}
@Experimental
public static <IN, OUT> TypeInformation<OUT> getGroupReduceReturnTypes(GroupReduceFunction<IN, OUT> groupReduceInterface, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType((Function) groupReduceInterface, GroupReduceFunction.class, true, true, inType, functionName, allowMissing);
}
@Experimental
public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType) {
return getGroupCombineReturnTypes(combineInterface, inType, null, false);
}
@Experimental
public static <IN, OUT> TypeInformation<OUT> getGroupCombineReturnTypes(GroupCombineFunction<IN, OUT> combineInterface, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType((Function) combineInterface, GroupCombineFunction.class, true, true, inType, functionName, allowMissing);
}
@Experimental
public static <IN1, IN2, OUT> TypeInformation<OUT> getFlatJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type)
{
return getFlatJoinReturnTypes(joinInterface, in1Type, in2Type, null, false);
}
@Experimental
public static <IN1, IN2, OUT> TypeInformation<OUT> getFlatJoinReturnTypes(FlatJoinFunction<IN1, IN2, OUT> joinInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
return getBinaryOperatorReturnType((Function) joinInterface, FlatJoinFunction.class, false, true,
in1Type, in2Type, functionName, allowMissing);
}
@Experimental
public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type)
{
return getJoinReturnTypes(joinInterface, in1Type, in2Type, null, false);
}
@Experimental
public static <IN1, IN2, OUT> TypeInformation<OUT> getJoinReturnTypes(JoinFunction<IN1, IN2, OUT> joinInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
return getBinaryOperatorReturnType((Function) joinInterface, JoinFunction.class, false, false,
in1Type, in2Type, functionName, allowMissing);
}
@Experimental
public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type)
{
return getCoGroupReturnTypes(coGroupInterface, in1Type, in2Type, null, false);
}
@Experimental
public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
return getBinaryOperatorReturnType((Function) coGroupInterface, CoGroupFunction.class, true, true,
in1Type, in2Type, functionName, allowMissing);
}
@Experimental
public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type)
{
return getCrossReturnTypes(crossInterface, in1Type, in2Type, null, false);
}
@Experimental
public static <IN1, IN2, OUT> TypeInformation<OUT> getCrossReturnTypes(CrossFunction<IN1, IN2, OUT> crossInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
return getBinaryOperatorReturnType((Function) crossInterface, CrossFunction.class, false, false,
in1Type, in2Type, functionName, allowMissing);
}
@Experimental
public static <IN, OUT> TypeInformation<OUT> getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface, TypeInformation<IN> inType) {
return getKeySelectorTypes(selectorInterface, inType, null, false);
}
@Experimental
public static <IN, OUT> TypeInformation<OUT> getKeySelectorTypes(KeySelector<IN, OUT> selectorInterface,
TypeInformation<IN> inType, String functionName, boolean allowMissing)
{
return getUnaryOperatorReturnType((Function) selectorInterface, KeySelector.class, false, false, inType, functionName, allowMissing);
}
@Experimental
public static <T> TypeInformation<T> getPartitionerTypes(Partitioner<T> partitioner) {
return getPartitionerTypes(partitioner, null, false);
}
@Experimental
public static <T> TypeInformation<T> getPartitionerTypes(Partitioner<T> partitioner, String functionName, boolean allowMissing) {
return new TypeExtractor().privateCreateTypeInfo(Partitioner.class, partitioner.getClass(), 0, null, null);
}
@SuppressWarnings("unchecked")
@Experimental
public static <IN> TypeInformation<IN> getInputFormatTypes(InputFormat<IN, ?> inputFormatInterface) {
if (inputFormatInterface instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<IN>) inputFormatInterface).getProducedType();
......@@ -258,6 +278,7 @@ public class TypeExtractor {
// --------------------------------------------------------------------------------------------
@SuppressWarnings("unchecked")
@Experimental
public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(Function function, Class<?> baseClass,
boolean hasIterable, boolean hasCollector, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
......@@ -295,6 +316,7 @@ public class TypeExtractor {
}
@SuppressWarnings("unchecked")
@Experimental
public static <IN1, IN2, OUT> TypeInformation<OUT> getBinaryOperatorReturnType(Function function, Class<?> baseClass,
boolean hasIterables, boolean hasCollector, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type,
String functionName, boolean allowMissing)
......@@ -366,6 +388,7 @@ public class TypeExtractor {
* @return type information
*/
@SuppressWarnings("unchecked")
@Experimental
public static <OUT> TypeInformation<OUT> createTypeInfo(Object instance, Class<?> baseClass, Class<?> clazz, int returnParamPos) {
if (instance instanceof ResultTypeQueryable) {
return ((ResultTypeQueryable<OUT>) instance).getProducedType();
......@@ -373,7 +396,8 @@ public class TypeExtractor {
return createTypeInfo(baseClass, clazz, returnParamPos, null, null);
}
}
@Experimental
public static <IN1, IN2, OUT> TypeInformation<OUT> createTypeInfo(Class<?> baseClass, Class<?> clazz, int returnParamPos,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
TypeInformation<OUT> ti = new TypeExtractor().privateCreateTypeInfo(baseClass, clazz, returnParamPos, in1Type, in2Type);
......@@ -765,7 +789,8 @@ public class TypeExtractor {
// --------------------------------------------------------------------------------------------
// Extract type parameters
// --------------------------------------------------------------------------------------------
@Experimental
public static Type getParameterType(Class<?> baseClass, Class<?> clazz, int pos) {
return getParameterType(baseClass, null, clazz, pos);
}
......@@ -1556,6 +1581,7 @@ public class TypeExtractor {
* This is required because class.getFields() is not returning fields defined
* in parent classes.
*/
@Experimental
public static List<Field> getAllDeclaredFields(Class<?> clazz) {
List<Field> result = new ArrayList<Field>();
while (clazz != null) {
......@@ -1574,7 +1600,8 @@ public class TypeExtractor {
}
return result;
}
@Experimental
public static Field getDeclaredField(Class<?> clazz, String name) {
for (Field field : getAllDeclaredFields(clazz)) {
if (field.getName().equals(name)) {
......@@ -1607,7 +1634,7 @@ public class TypeExtractor {
return result;
}
// not public to users
@Internal
public static Class<?> typeToClass(Type t) {
if (t instanceof Class) {
return (Class<?>)t;
......@@ -1618,7 +1645,7 @@ public class TypeExtractor {
throw new IllegalArgumentException("Cannot convert type to class");
}
// not public to users
@Internal
public static boolean isClassType(Type t) {
return t instanceof Class<?> || t instanceof ParameterizedType;
}
......
......@@ -24,13 +24,14 @@ import java.util.ArrayList;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.types.Value;
@Public
public class TypeInfoParser {
private static final String TUPLE_PACKAGE = "org.apache.flink.api.java.tuple";
private static final String VALUE_PACKAGE = "org.apache.flink.types";
......
......@@ -19,6 +19,8 @@
package org.apache.flink.api.java.typeutils;
import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.AtomicType;
......@@ -48,6 +50,7 @@ import org.apache.flink.types.Value;
*
* @param <T> The type of the class represented by this type information.
*/
@Public
public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implements AtomicType<T> {
private static final long serialVersionUID = 1L;
......@@ -64,7 +67,8 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
public static final ValueTypeInfo<StringValue> STRING_VALUE_TYPE_INFO = new ValueTypeInfo<>(StringValue.class);
private final Class<T> type;
@Experimental
public ValueTypeInfo(Class<T> type) {
this.type = Preconditions.checkNotNull(type);
......@@ -74,25 +78,30 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
}
@Override
@Experimental
public int getArity() {
return 1;
}
@Override
@Experimental
public int getTotalFields() {
return 1;
}
@Override
@Experimental
public Class<T> getTypeClass() {
return this.type;
}
@Override
@Experimental
public boolean isBasicType() {
return false;
}
@Experimental
public boolean isBasicValueType() {
return type.equals(StringValue.class) || type.equals(ByteValue.class) || type.equals(ShortValue.class) || type.equals(CharValue.class) ||
type.equals(DoubleValue.class) || type.equals(FloatValue.class) || type.equals(IntValue.class) || type.equals(LongValue.class) ||
......@@ -100,17 +109,20 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
}
@Override
@Experimental
public boolean isTupleType() {
return false;
}
@Override
@Experimental
public boolean isKeyType() {
return Comparable.class.isAssignableFrom(type);
}
@Override
@SuppressWarnings("unchecked")
@Experimental
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
if (CopyableValue.class.isAssignableFrom(type)) {
return (TypeSerializer<T>) createCopyableValueSerializer(type.asSubclass(CopyableValue.class));
......@@ -122,6 +134,7 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
@Experimental
public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
if (!isKeyType()) {
throw new RuntimeException("The type " + type.getName() + " is not Comparable.");
......@@ -171,7 +184,8 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
}
// --------------------------------------------------------------------------------------------
@Experimental
static <X extends Value> TypeInformation<X> getValueTypeInfo(Class<X> typeClass) {
if (Value.class.isAssignableFrom(typeClass) && !typeClass.equals(Value.class)) {
return new ValueTypeInfo<X>(typeClass);
......
......@@ -19,6 +19,8 @@
package org.apache.flink.api.java.typeutils;
import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.AtomicType;
......@@ -35,12 +37,14 @@ import org.apache.hadoop.io.Writable;
*
* @param <T> The type of the class represented by this type information.
*/
@Public
public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> implements AtomicType<T> {
private static final long serialVersionUID = 1L;
private final Class<T> typeClass;
@Experimental
public WritableTypeInfo(Class<T> typeClass) {
this.typeClass = Preconditions.checkNotNull(typeClass);
......@@ -51,6 +55,7 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
@Experimental
public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig executionConfig) {
if(Comparable.class.isAssignableFrom(typeClass)) {
return new WritableComparator(sortOrderAscending, typeClass);
......@@ -62,36 +67,43 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
}
@Override
@Experimental
public boolean isBasicType() {
return false;
}
@Override
@Experimental
public boolean isTupleType() {
return false;
}
@Override
@Experimental
public int getArity() {
return 1;
}
@Override
@Experimental
public int getTotalFields() {
return 1;
}
@Override
@Experimental
public Class<T> getTypeClass() {
return this.typeClass;
}
@Override
@Experimental
public boolean isKeyType() {
return Comparable.class.isAssignableFrom(typeClass);
}
@Override
@Experimental
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
return new WritableSerializer<T>(typeClass);
}
......@@ -126,7 +138,8 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
}
// --------------------------------------------------------------------------------------------
@Experimental
static <T extends Writable> TypeInformation<T> getWritableTypeInfo(Class<T> typeClass) {
if (Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class)) {
return new WritableTypeInfo<T>(typeClass);
......
......@@ -18,6 +18,8 @@
package org.apache.flink.types;
import org.apache.flink.annotation.Public;
/**
* This type represents a value of one two possible types, Left or Right (a
* disjoint union), inspired by Scala's Either type.
......@@ -27,6 +29,7 @@ package org.apache.flink.types;
* @param <R>
* the type of Right
*/
@Public
public abstract class Either<L, R> {
/**
......
......@@ -15,13 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.types;
import org.apache.flink.annotation.Public;
/**
* A type for (synthetic) operators that do not output data. For example, data sinks.
*/
@Public
public class Nothing {
private Nothing() {}
}
......@@ -167,6 +167,7 @@ class TupleGenerator {
// package and imports
w.println("package " + PACKAGE + ';');
w.println();
w.println("import PublicInterface;");
w.println("import org.apache.flink.util.StringUtils;");
w.println();
......@@ -185,6 +186,7 @@ class TupleGenerator {
w.println(" * @param <" + GEN_TYPE_PREFIX + i + "> The type of field " + i);
}
w.println(" */");
w.println("@PublicInterface");
w.print("public class " + className + "<");
for (int i = 0; i < numFields; i++) {
if (i > 0) {
......@@ -443,10 +445,12 @@ class TupleGenerator {
w.println("import java.util.ArrayList;");
w.println("import java.util.List;");
w.println();
w.println("import PublicInterface;");
w.println("import " + PACKAGE + ".Tuple" + numFields + ";");
w.println();
// class declaration
w.println("@PublicInterface");
w.print("public class " + className);
printGenericsString(w, numFields);
w.println(" {");
......
......@@ -75,9 +75,9 @@ under the License.
</dependencies>
<!-- Because flink-scala and flink-avro uses it in tests -->
<build>
<plugins>
<!-- Because flink-scala and flink-avro uses it in tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
......
......@@ -19,6 +19,8 @@
package org.apache.flink.api.java;
import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
......@@ -102,6 +104,8 @@ import java.util.List;
*
* @param <T> The type of the DataSet, i.e., the type of the elements of the DataSet.
*/
@Public
public abstract class DataSet<T> {
protected final ExecutionEnvironment context;
......@@ -1638,6 +1642,7 @@ public abstract class DataSet<T> {
* @deprecated Use {@link #printOnTaskManager(String)} instead.
*/
@Deprecated
@Experimental
public DataSink<T> print(String sinkIdentifier) {
return output(new PrintingOutputFormat<T>(sinkIdentifier, false));
}
......@@ -1654,6 +1659,7 @@ public abstract class DataSet<T> {
* {@link PrintingOutputFormat} instead.
*/
@Deprecated
@Experimental
public DataSink<T> printToErr(String sinkIdentifier) {
return output(new PrintingOutputFormat<T>(sinkIdentifier, true));
}
......
......@@ -23,6 +23,9 @@ import com.esotericsoftware.kryo.Serializer;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
......@@ -93,6 +96,7 @@ import java.util.List;
* @see LocalEnvironment
* @see RemoteEnvironment
*/
@Public
public abstract class ExecutionEnvironment {
/** The logger used by the environment and its subclasses */
......@@ -184,6 +188,7 @@ public abstract class ExecutionEnvironment {
*
* @param numberOfExecutionRetries The number of times the system will try to re-execute failed tasks.
*/
@Experimental
public void setNumberOfExecutionRetries(int numberOfExecutionRetries) {
config.setNumberOfExecutionRetries(numberOfExecutionRetries);
}
......@@ -195,6 +200,7 @@ public abstract class ExecutionEnvironment {
*
* @return The number of times the system will try to re-execute failed tasks.
*/
@Experimental
public int getNumberOfExecutionRetries() {
return config.getNumberOfExecutionRetries();
}
......@@ -219,6 +225,7 @@ public abstract class ExecutionEnvironment {
* @return The JobID of this environment.
* @see #getIdString()
*/
@Experimental
public JobID getId() {
return this.jobID;
}
......@@ -229,6 +236,7 @@ public abstract class ExecutionEnvironment {
* @return The JobID as a string.
* @see #getId()
*/
@Experimental
public String getIdString() {
return this.jobID.toString();
}
......@@ -239,6 +247,7 @@ public abstract class ExecutionEnvironment {
*
* @param timeout The timeout, in seconds.
*/
@Experimental
public void setSessionTimeout(long timeout) {
throw new IllegalStateException("Support for sessions is currently disabled. " +
"It will be enabled in future Flink versions.");
......@@ -256,6 +265,7 @@ public abstract class ExecutionEnvironment {
*
* @return The session timeout, in seconds.
*/
@Experimental
public long getSessionTimeout() {
return sessionTimeout;
}
......@@ -263,6 +273,7 @@ public abstract class ExecutionEnvironment {
/**
* Starts a new session, discarding the previous data flow and all of its intermediate results.
*/
@Experimental
public abstract void startNewSession() throws Exception;
// --------------------------------------------------------------------------------------------
......@@ -547,6 +558,7 @@ public abstract class ExecutionEnvironment {
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. The
* given inputName is set on the given job.
*/
@Experimental
public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath, JobConf job) {
DataSource<Tuple2<K, V>> result = createHadoopInput(mapredInputFormat, key, value, job);
......@@ -559,6 +571,7 @@ public abstract class ExecutionEnvironment {
* Creates a {@link DataSet} from {@link org.apache.hadoop.mapred.SequenceFileInputFormat}
* A {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
*/
@Experimental
public <K,V> DataSource<Tuple2<K, V>> readSequenceFile(Class<K> key, Class<V> value, String inputPath) throws IOException {
return readHadoopFile(new org.apache.hadoop.mapred.SequenceFileInputFormat<K, V>(), key, value, inputPath);
}
......@@ -567,6 +580,7 @@ public abstract class ExecutionEnvironment {
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.FileInputFormat}. A
* {@link org.apache.hadoop.mapred.JobConf} with the given inputPath is created.
*/
@Experimental
public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapred.FileInputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, String inputPath) {
return readHadoopFile(mapredInputFormat, key, value, inputPath, new JobConf());
}
......@@ -574,6 +588,7 @@ public abstract class ExecutionEnvironment {
/**
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapred.InputFormat}.
*/
@Experimental
public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
HadoopInputFormat<K, V> hadoopInputFormat = new HadoopInputFormat<>(mapredInputFormat, key, value, job);
......@@ -584,6 +599,7 @@ public abstract class ExecutionEnvironment {
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. The
* given inputName is set on the given job.
*/
@Experimental
public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath, Job job) throws IOException {
DataSource<Tuple2<K, V>> result = createHadoopInput(mapreduceInputFormat, key, value, job);
......@@ -597,6 +613,7 @@ public abstract class ExecutionEnvironment {
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}. A
* {@link org.apache.hadoop.mapreduce.Job} with the given inputPath is created.
*/
@Experimental
public <K,V> DataSource<Tuple2<K, V>> readHadoopFile(org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, String inputPath) throws IOException {
return readHadoopFile(mapreduceInputFormat, key, value, inputPath, Job.getInstance());
}
......@@ -604,6 +621,7 @@ public abstract class ExecutionEnvironment {
/**
* Creates a {@link DataSet} from the given {@link org.apache.hadoop.mapreduce.InputFormat}.
*/
@Experimental
public <K,V> DataSource<Tuple2<K, V>> createHadoopInput(org.apache.hadoop.mapreduce.InputFormat<K,V> mapreduceInputFormat, Class<K> key, Class<V> value, Job job) {
org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<K, V> hadoopInputFormat = new org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat<>(mapreduceInputFormat, key, value, job);
......@@ -890,6 +908,7 @@ public abstract class ExecutionEnvironment {
*
* @return The program's plan.
*/
@Internal
public Plan createProgramPlan() {
return createProgramPlan(null);
}
......@@ -905,6 +924,7 @@ public abstract class ExecutionEnvironment {
* @param jobName The name attached to the plan (displayed in logs and monitoring).
* @return The program's plan.
*/
@Internal
public Plan createProgramPlan(String jobName) {
return createProgramPlan(jobName, true);
}
......@@ -920,6 +940,7 @@ public abstract class ExecutionEnvironment {
* @param clearSinks Whether or not to start a new stage of execution.
* @return The program's plan.
*/
@Internal
public Plan createProgramPlan(String jobName, boolean clearSinks) {
if (this.sinks.isEmpty()) {
if (wasExecuted) {
......@@ -1014,6 +1035,7 @@ public abstract class ExecutionEnvironment {
*
* @param sink The sink to add for execution.
*/
@Internal
void registerDataSink(DataSink<?> sink) {
this.sinks.add(sink);
}
......@@ -1050,6 +1072,7 @@ public abstract class ExecutionEnvironment {
* memory. parallelism will always be 1. This is useful during implementation and for debugging.
* @return A Collection Environment
*/
@Experimental
public static CollectionEnvironment createCollectionsEnvironment(){
CollectionEnvironment ce = new CollectionEnvironment();
ce.setParallelism(1);
......@@ -1195,6 +1218,7 @@ public abstract class ExecutionEnvironment {
* @return True, if it is possible to explicitly instantiate a LocalEnvironment or a
* RemoteEnvironment, false otherwise.
*/
@Internal
public static boolean areExplicitEnvironmentsAllowed() {
return contextEnvironmentFactory == null;
}
......
......@@ -18,9 +18,12 @@
package org.apache.flink.api.java;
import org.apache.flink.annotation.Public;
/**
* Factory class for execution environments.
*/
@Public
public interface ExecutionEnvironmentFactory {
/**
......
......@@ -18,6 +18,8 @@
package org.apache.flink.api.java;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
......@@ -36,6 +38,7 @@ import org.apache.flink.configuration.Configuration;
* and {@link ExecutionEnvironment#createLocalEnvironment(int)}. The former version will pick a
* default parallelism equal to the number of hardware contexts in the local machine.
*/
@Public
public class LocalEnvironment extends ExecutionEnvironment {
/** The user-defined configuration for the local execution */
......@@ -106,6 +109,7 @@ public class LocalEnvironment extends ExecutionEnvironment {
}
@Override
@Experimental
public void startNewSession() throws Exception {
if (executor != null) {
// we need to end the previous session
......
......@@ -18,6 +18,8 @@
package org.apache.flink.api.java;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
......@@ -39,6 +41,7 @@ import java.net.URL;
* must be attached to the remote environment as JAR files, to allow the environment to ship the
* classes into the cluster for the distributed execution.
*/
@Public
public class RemoteEnvironment extends ExecutionEnvironment {
/** The hostname of the JobManager */
......@@ -180,6 +183,7 @@ public class RemoteEnvironment extends ExecutionEnvironment {
}
@Override
@Experimental
public void startNewSession() throws Exception {
dispose();
jobID = JobID.generate();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册