diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 6507ce9ea35d5eac67e6b8ad7d3fa43502598afa..61925f413837434e92b663af4e3bb1a92d71d7e4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -72,7 +72,11 @@ import java.util.Optional; * the timer service, timer callbacks are also guaranteed not to be called concurrently with * methods on {@code StreamOperator}. * - * @param The output type of the operator + *

Note, this class is going to be removed and replaced in the future by {@link AbstractStreamOperatorV2}. + * However as {@link AbstractStreamOperatorV2} is currently experimental, {@link AbstractStreamOperator} + * has not been deprecated just yet. + * + * @param The output type of the operator. */ @PublicEvolving public abstract class AbstractStreamOperator @@ -385,15 +389,18 @@ public abstract class AbstractStreamOperator * to interact with systems such as broadcast variables and managed state. This also allows * to register timers. */ + @VisibleForTesting public StreamingRuntimeContext getRuntimeContext() { return runtimeContext; } @SuppressWarnings("unchecked") + @VisibleForTesting public KeyedStateBackend getKeyedStateBackend() { return stateHandler.getKeyedStateBackend(); } + @VisibleForTesting public OperatorStateBackend getOperatorStateBackend() { return stateHandler.getOperatorStateBackend(); } @@ -402,6 +409,7 @@ public abstract class AbstractStreamOperator * Returns the {@link ProcessingTimeService} responsible for getting the current * processing time and registering timers. */ + @VisibleForTesting public ProcessingTimeService getProcessingTimeService() { return processingTimeService; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java new file mode 100644 index 0000000000000000000000000000000000000000..a4ad021d2438d4f211a3ba04d099b9d81d3597a0 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java @@ -0,0 +1,507 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.util.LatencyStats; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Optional; + +/** + * New base class for all stream operators, intended to eventually replace {@link AbstractStreamOperator}. + * Currently intended to work smoothly just with {@link MultipleInputStreamOperator}. + * + *

One note-able difference in comparison to {@link AbstractStreamOperator} is lack of + * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in favor of initialisation + * in the constructor, and removed some tight coupling with classes like {@link StreamTask}. + * + *

Methods are guaranteed not to be called concurrently. + * + * @param The output type of the operator + */ +@Experimental +public abstract class AbstractStreamOperatorV2 implements StreamOperator, CheckpointedStreamOperator { + /** The logger used by the operator class and its subclasses. */ + protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperatorV2.class); + + protected final StreamConfig config; + protected final Output> output; + private final StreamingRuntimeContext runtimeContext; + private final ExecutionConfig executionConfig; + private final ClassLoader userCodeClassLoader; + private final CloseableRegistry cancelables; + private final long[] inputWatermarks; + + /** Metric group for the operator. */ + protected final OperatorMetricGroup metrics; + protected final LatencyStats latencyStats; + protected final ProcessingTimeService processingTimeService; + + private StreamOperatorStateHandler stateHandler; + private InternalTimeServiceManager timeServiceManager; + + // We keep track of watermarks from both inputs, the combined input is the minimum + // Once the minimum advances we emit a new watermark for downstream operators + private long combinedWatermark = Long.MIN_VALUE; + + public AbstractStreamOperatorV2(StreamOperatorParameters parameters, int numberOfInputs) { + inputWatermarks = new long[numberOfInputs]; + Arrays.fill(inputWatermarks, Long.MIN_VALUE); + final Environment environment = parameters.getContainingTask().getEnvironment(); + config = parameters.getStreamConfig(); + CountingOutput countingOutput; + OperatorMetricGroup operatorMetricGroup; + try { + operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName()); + countingOutput = new CountingOutput(parameters.getOutput(), operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter()); + if (config.isChainStart()) { + operatorMetricGroup.getIOMetricGroup().reuseInputMetricsForTask(); + } + if (config.isChainEnd()) { + operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask(); + } + } catch (Exception e) { + LOG.warn("An error occurred while instantiating task metrics.", e); + countingOutput = null; + operatorMetricGroup = null; + } + + if (countingOutput == null || operatorMetricGroup == null) { + metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); + output = parameters.getOutput(); + } + else { + metrics = operatorMetricGroup; + output = countingOutput; + } + + latencyStats = createLatencyStats( + environment.getTaskManagerInfo().getConfiguration(), + parameters.getContainingTask().getIndexInSubtaskGroup()); + + processingTimeService = Preconditions.checkNotNull(parameters.getProcessingTimeService()); + executionConfig = parameters.getContainingTask().getExecutionConfig(); + userCodeClassLoader = parameters.getContainingTask().getUserCodeClassLoader(); + cancelables = parameters.getContainingTask().getCancelables(); + + runtimeContext = new StreamingRuntimeContext( + environment, + environment.getAccumulatorRegistry().getUserMap(), + operatorMetricGroup, + getOperatorID(), + processingTimeService, + null); + } + + private LatencyStats createLatencyStats(Configuration taskManagerConfig, int indexInSubtaskGroup) { + try { + int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE); + if (historySize <= 0) { + LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize); + historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue(); + } + + final String configuredGranularity = taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY); + LatencyStats.Granularity granularity; + try { + granularity = LatencyStats.Granularity.valueOf(configuredGranularity.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException iae) { + granularity = LatencyStats.Granularity.OPERATOR; + LOG.warn( + "Configured value {} option for {} is invalid. Defaulting to {}.", + configuredGranularity, + MetricOptions.LATENCY_SOURCE_GRANULARITY.key(), + granularity); + } + TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent(); + return new LatencyStats(jobMetricGroup.addGroup("latency"), + historySize, + indexInSubtaskGroup, + getOperatorID(), + granularity); + } catch (Exception e) { + LOG.warn("An error occurred while instantiating latency metrics.", e); + return new LatencyStats( + UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"), + 1, + 0, + new OperatorID(), + LatencyStats.Granularity.SINGLE); + } + } + + @Override + public MetricGroup getMetricGroup() { + return metrics; + } + + @Override + public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { + final TypeSerializer keySerializer = config.getStateKeySerializer(getUserCodeClassloader()); + + final StreamOperatorStateContext context = + streamTaskStateManager.streamOperatorStateContext( + getOperatorID(), + getClass().getSimpleName(), + getProcessingTimeService(), + this, + keySerializer, + cancelables, + metrics); + + stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), cancelables); + timeServiceManager = context.internalTimerServiceManager(); + stateHandler.initializeOperatorState(this); + } + + /** + * This method is called immediately before any elements are processed, it should contain the + * operator's initialization logic, e.g. state initialization. + * + *

The default implementation does nothing. + * + * @throws Exception An exception in this method causes the operator to fail. + */ + @Override + public void open() throws Exception {} + + /** + * This method is called after all records have been added to the operators via the methods + * {@link OneInputStreamOperator#processElement(StreamRecord)}, or + * {@link TwoInputStreamOperator#processElement1(StreamRecord)} and + * {@link TwoInputStreamOperator#processElement2(StreamRecord)}. + * + *

The method is expected to flush all remaining buffered data. Exceptions during this flushing + * of buffered should be propagated, in order to cause the operation to be recognized asa failed, + * because the last data items are not processed properly. + * + * @throws Exception An exception in this method causes the operator to fail. + */ + @Override + public void close() throws Exception {} + + /** + * This method is called at the very end of the operator's life, both in the case of a successful + * completion of the operation, and in the case of a failure and canceling. + * + *

This method is expected to make a thorough effort to release all resources + * that the operator has acquired. + */ + @Override + public void dispose() throws Exception { + if (stateHandler != null) { + stateHandler.dispose(); + } + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + // the default implementation does nothing and accepts the checkpoint + // this is purely for subclasses to override + } + + @Override + public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, + CheckpointStreamFactory factory) throws Exception { + return stateHandler.snapshotState( + this, + Optional.ofNullable(timeServiceManager), + getOperatorName(), + checkpointId, + timestamp, + checkpointOptions, + factory); + } + + /** + * Stream operators with state, which want to participate in a snapshot need to override this hook method. + * + * @param context context that provides information and means required for taking a snapshot + */ + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + } + + /** + * Stream operators with state which can be restored need to override this hook method. + * + * @param context context that allows to register different states. + */ + @Override + public void initializeState(StateInitializationContext context) throws Exception { + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + stateHandler.notifyCheckpointComplete(checkpointId); + } + + // ------------------------------------------------------------------------ + // Properties and Services + // ------------------------------------------------------------------------ + + /** + * Gets the execution config defined on the execution environment of the job to which this + * operator belongs. + * + * @return The job's execution config. + */ + public ExecutionConfig getExecutionConfig() { + return executionConfig; + } + + public StreamConfig getOperatorConfig() { + return config; + } + + public ClassLoader getUserCodeClassloader() { + return userCodeClassLoader; + } + + /** + * Return the operator name. If the runtime context has been set, then the task name with + * subtask index is returned. Otherwise, the simple class name is returned. + * + * @return If runtime context is set, then return task name with subtask index. Otherwise return + * simple class name. + */ + protected String getOperatorName() { + if (runtimeContext != null) { + return runtimeContext.getTaskNameWithSubtasks(); + } else { + return getClass().getSimpleName(); + } + } + + /** + * Returns a context that allows the operator to query information about the execution and also + * to interact with systems such as broadcast variables and managed state. This also allows + * to register timers. + */ + public StreamingRuntimeContext getRuntimeContext() { + return runtimeContext; + } + + @SuppressWarnings("unchecked") + @VisibleForTesting + public KeyedStateBackend getKeyedStateBackend() { + return (KeyedStateBackend) stateHandler.getKeyedStateBackend(); + } + + @VisibleForTesting + public OperatorStateBackend getOperatorStateBackend() { + return stateHandler.getOperatorStateBackend(); + } + + /** + * Returns the {@link ProcessingTimeService} responsible for getting the current + * processing time and registering timers. + */ + @VisibleForTesting + public ProcessingTimeService getProcessingTimeService() { + return processingTimeService; + } + + /** + * Creates a partitioned state handle, using the state backend configured for this task. + * + * @throws IllegalStateException Thrown, if the key/value state was already initialized. + * @throws Exception Thrown, if the state backend cannot create the key/value state. + */ + protected S getPartitionedState(StateDescriptor stateDescriptor) throws Exception { + return getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor); + } + + protected S getOrCreateKeyedState( + TypeSerializer namespaceSerializer, + StateDescriptor stateDescriptor) throws Exception { + return stateHandler.getOrCreateKeyedState(namespaceSerializer, stateDescriptor); + } + + /** + * Creates a partitioned state handle, using the state backend configured for this task. + * + * @throws IllegalStateException Thrown, if the key/value state was already initialized. + * @throws Exception Thrown, if the state backend cannot create the key/value state. + */ + protected S getPartitionedState( + N namespace, + TypeSerializer namespaceSerializer, + StateDescriptor stateDescriptor) throws Exception { + return stateHandler.getPartitionedState(namespace, namespaceSerializer, stateDescriptor); + } + + protected void internalSetKeyContextElement(StreamRecord record, KeySelector selector) throws Exception { + if (selector != null) { + Object key = selector.getKey(record.getValue()); + setCurrentKey(key); + } + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + public void setCurrentKey(Object key) { + stateHandler.setCurrentKey(key); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + public Object getCurrentKey() { + return stateHandler.getCurrentKey(); + } + + public Optional getKeyedStateStore() { + if (stateHandler == null) { + return Optional.empty(); + } + return stateHandler.getKeyedStateStore(); + } + + protected void reportOrForwardLatencyMarker(LatencyMarker marker) { + // all operators are tracking latencies + this.latencyStats.reportLatency(marker); + + // everything except sinks forwards latency markers + this.output.emitLatencyMarker(marker); + } + + // ------------------------------------------------------------------------ + // Watermark handling + // ------------------------------------------------------------------------ + + /** + * Returns a {@link InternalTimerService} that can be used to query current processing time + * and event time and to set timers. An operator can have several timer services, where + * each has its own namespace serializer. Timer services are differentiated by the string + * key that is given when requesting them, if you call this method with the same key + * multiple times you will get the same timer service instance in subsequent requests. + * + *

Timers are always scoped to a key, the currently active key of a keyed stream operation. + * When a timer fires, this key will also be set as the currently active key. + * + *

Each timer has attached metadata, the namespace. Different timer services + * can have a different namespace type. If you don't need namespace differentiation you + * can use {@link VoidNamespaceSerializer} as the namespace serializer. + * + * @param name The name of the requested timer service. If no service exists under the given + * name a new one will be created and returned. + * @param namespaceSerializer {@code TypeSerializer} for the timer namespace. + * @param triggerable The {@link Triggerable} that should be invoked when timers fire + * + * @param The type of the timer namespace. + */ + @VisibleForTesting + public InternalTimerService getInternalTimerService( + String name, + TypeSerializer namespaceSerializer, + Triggerable triggerable) { + if (timeServiceManager == null) { + throw new RuntimeException("The timer service has not been initialized."); + } + InternalTimeServiceManager keyedTimeServiceHandler = (InternalTimeServiceManager) timeServiceManager; + return keyedTimeServiceHandler.getInternalTimerService( + name, + namespaceSerializer, + triggerable, + stateHandler.getKeyedStateBackend()); + } + + public void processWatermark(Watermark mark) throws Exception { + if (timeServiceManager != null) { + timeServiceManager.advanceWatermark(mark); + } + output.emitWatermark(mark); + } + + protected void reportWatermark(Watermark mark, int inputId) throws Exception { + inputWatermarks[inputId] = mark.getTimestamp(); + long newMin = mark.getTimestamp(); + for (long inputWatermark : inputWatermarks) { + newMin = Math.min(inputWatermark, newMin); + } + if (newMin > combinedWatermark) { + combinedWatermark = newMin; + processWatermark(new Watermark(combinedWatermark)); + } + } + + @Override + public OperatorID getOperatorID() { + return config.getOperatorID(); + } + + @VisibleForTesting + public int numProcessingTimeTimers() { + return timeServiceManager == null ? 0 : timeServiceManager.numProcessingTimeTimers(); + } + + @VisibleForTesting + public int numEventTimeTimers() { + return timeServiceManager == null ? 0 : timeServiceManager.numEventTimeTimers(); + } + + @Override + public void setKeyContextElement1(StreamRecord record) throws Exception { + throw new IllegalStateException("This method should never be called. Use Input class instead"); + } + + @Override + public void setKeyContextElement2(StreamRecord record) throws Exception { + throw new IllegalStateException("This method should never be called. Use Input class instead"); + } + + protected Optional> getTimeServiceManager() { + return Optional.ofNullable(timeServiceManager); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java index 1f6f1a03a679b30bbd30857771cdbf5b5fdab5c0..5aaffcc46a0d5f1d079add3da7988db4cbe153c0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java @@ -25,9 +25,9 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; /** - * Helper class to construct {@link StreamOperatorBase}. Wraps couple of internal parameters - * to simplify for users construction of classes extending {@link StreamOperatorBase} and to - * allow for backward compatible changes in the {@link StreamOperatorBase}'s constructor. + * Helper class to construct {@link AbstractStreamOperatorV2}. Wraps couple of internal parameters + * to simplify for users construction of classes extending {@link AbstractStreamOperatorV2} and to + * allow for backward compatible changes in the {@link AbstractStreamOperatorV2}'s constructor. * * @param The output type of an operator that will be constructed using {@link StreamOperatorParameters}. */ diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java index fc0144b732bc7ceadb73c6ad03c4d2c7d657d6dc..88c4f9a66a88c57fd24f1a32f50d2da9e5fc49ba 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java @@ -30,9 +30,12 @@ import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.streaming.api.functions.co.CoMapFunction; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.operators.co.CoStreamMap; import org.apache.flink.streaming.runtime.io.InputStatus; import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor; @@ -68,7 +71,7 @@ public class MultipleInputStreamTaskTest { .addInput(BasicTypeInfo.STRING_TYPE_INFO) .addInput(BasicTypeInfo.INT_TYPE_INFO) .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO) - .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperator()) + .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory()) .build()) { long initialTime = 0L; @@ -98,7 +101,7 @@ public class MultipleInputStreamTaskTest { .addInput(BasicTypeInfo.STRING_TYPE_INFO, 2) .addInput(BasicTypeInfo.INT_TYPE_INFO, 2) .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2) - .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperator()) + .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory()) .build()) { ArrayDeque expectedOutput = new ArrayDeque<>(); long initialTime = 0L; @@ -148,7 +151,7 @@ public class MultipleInputStreamTaskTest { .addInput(BasicTypeInfo.STRING_TYPE_INFO, 2) .addInput(BasicTypeInfo.INT_TYPE_INFO, 2) .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2) - .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperator()) + .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory()) .build()) { ArrayDeque expectedOutput = new ArrayDeque<>(); long initialTime = 0L; @@ -217,7 +220,7 @@ public class MultipleInputStreamTaskTest { .addInput(BasicTypeInfo.STRING_TYPE_INFO) .addInput(BasicTypeInfo.STRING_TYPE_INFO) .addInput(BasicTypeInfo.STRING_TYPE_INFO) - .setupOperatorChain(new DuplicatingOperator()) + .setupOperatorChain(new DuplicatingOperatorFactory()) .chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())) .chain(new OneInputStreamTaskTest.DuplicatingOperator(), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())) .finish() @@ -246,9 +249,13 @@ public class MultipleInputStreamTaskTest { } } - static class DuplicatingOperator extends AbstractStreamOperator + static class DuplicatingOperator extends AbstractStreamOperatorV2 implements MultipleInputStreamOperator { + public DuplicatingOperator(StreamOperatorParameters parameters) { + super(parameters, 3); + } + @Override public List getInputs() { return Arrays.asList(new DuplicatingInput(), new DuplicatingInput(), new DuplicatingInput()); @@ -270,7 +277,7 @@ public class MultipleInputStreamTaskTest { .addInput(BasicTypeInfo.STRING_TYPE_INFO) .addInput(BasicTypeInfo.STRING_TYPE_INFO) .addInput(BasicTypeInfo.STRING_TYPE_INFO) - .setupOperatorChain(new TestBoundedMultipleInputOperator("Operator0")) + .setupOperatorChain(new TestBoundedMultipleInputOperatorFactory()) .chain(new TestBoundedOneInputStreamOperator("Operator1"), BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig())) .finish() .build(); @@ -317,7 +324,7 @@ public class MultipleInputStreamTaskTest { .addInput(BasicTypeInfo.STRING_TYPE_INFO) .addInput(BasicTypeInfo.STRING_TYPE_INFO) .addInput(BasicTypeInfo.STRING_TYPE_INFO) - .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperator()) + .setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperatorFactory()) .build()) { ArrayDeque expectedOutput = new ArrayDeque<>(); @@ -352,12 +359,16 @@ public class MultipleInputStreamTaskTest { // This must only be used in one test, otherwise the static fields will be changed // by several tests concurrently private static class MapToStringMultipleInputOperator - extends AbstractStreamOperator implements MultipleInputStreamOperator { + extends AbstractStreamOperatorV2 implements MultipleInputStreamOperator { private static final long serialVersionUID = 1L; private boolean openCalled; private boolean closeCalled; + public MapToStringMultipleInputOperator(StreamOperatorParameters parameters) { + super(parameters, 3); + } + @Override public void open() throws Exception { super.open(); @@ -418,5 +429,41 @@ public class MultipleInputStreamTaskTest { return value.toString(); } } + + private static class TestBoundedMultipleInputOperatorFactory extends AbstractStreamOperatorFactory { + @Override + public > T createStreamOperator(StreamOperatorParameters parameters) { + return (T) new TestBoundedMultipleInputOperator("Operator0", parameters); + } + + @Override + public Class> getStreamOperatorClass(ClassLoader classLoader) { + return TestBoundedMultipleInputOperator.class; + } + } + + private static class DuplicatingOperatorFactory extends AbstractStreamOperatorFactory { + @Override + public > T createStreamOperator(StreamOperatorParameters parameters) { + return (T) new DuplicatingOperator(parameters); + } + + @Override + public Class> getStreamOperatorClass(ClassLoader classLoader) { + return DuplicatingOperator.class; + } + } + + private static class MapToStringMultipleInputOperatorFactory extends AbstractStreamOperatorFactory { + @Override + public > T createStreamOperator(StreamOperatorParameters parameters) { + return (T) new MapToStringMultipleInputOperator(parameters); + } + + @Override + public Class> getStreamOperatorClass(ClassLoader classLoader) { + return MapToStringMultipleInputOperator.class; + } + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java index 89d242a315e95f0739e3f9c7a0ae83605e1a1800..e6ebe84a8be1d9ecb1b85adb8803a260fd5fcfac 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java @@ -18,10 +18,11 @@ package org.apache.flink.streaming.util; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; import org.apache.flink.streaming.api.operators.BoundedMultiInput; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import java.util.Arrays; @@ -30,14 +31,15 @@ import java.util.List; /** * A test operator class implementing {@link BoundedMultiInput}. */ -public class TestBoundedMultipleInputOperator extends AbstractStreamOperator +public class TestBoundedMultipleInputOperator extends AbstractStreamOperatorV2 implements MultipleInputStreamOperator, BoundedMultiInput { private static final long serialVersionUID = 1L; private final String name; - public TestBoundedMultipleInputOperator(String name) { + public TestBoundedMultipleInputOperator(String name, StreamOperatorParameters parameters) { + super(parameters, 3); this.name = name; } diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java index f81bac9d257bf3b2d9b23e306f02750ec8b96ef4..32b64249acb4d585d15b38b6eda256e8ce39abfb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java @@ -21,12 +21,11 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2; import org.apache.flink.streaming.api.operators.Input; import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.transformations.MultipleInputTransformation; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -84,13 +83,14 @@ public class MultipleInputITCase extends AbstractTestBase { /** * 3 input operator that sums all of it inputs. - * TODO: provide non {@link SetupableStreamOperator} variant of {@link AbstractStreamOperator}? - * TODO: provide non {@link AbstractStreamOperator} seems to pre-override processWatermark1/2 and other - * methods that are not defined there? */ - public static class SumAllInputOperator extends AbstractStreamOperator implements MultipleInputStreamOperator { + public static class SumAllInputOperator extends AbstractStreamOperatorV2 implements MultipleInputStreamOperator { private long sum; + public SumAllInputOperator(StreamOperatorParameters parameters) { + super(parameters, 3); + } + @Override public List getInputs() { return Arrays.asList( @@ -114,29 +114,15 @@ public class MultipleInputITCase extends AbstractTestBase { /** * Factory for {@link SumAllInputOperator}. */ - public static class SumAllInputOperatorFactory implements StreamOperatorFactory { - private ChainingStrategy chainingStrategy; - + public static class SumAllInputOperatorFactory extends AbstractStreamOperatorFactory { @Override public > T createStreamOperator(StreamOperatorParameters parameters) { - SumAllInputOperator sumAllInputOperator = new SumAllInputOperator(); - sumAllInputOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); - return (T) sumAllInputOperator; - } - - @Override - public void setChainingStrategy(ChainingStrategy chainingStrategy) { - this.chainingStrategy = chainingStrategy; - } - - @Override - public ChainingStrategy getChainingStrategy() { - return chainingStrategy; + return (T) new SumAllInputOperator(parameters); } @Override public Class getStreamOperatorClass(ClassLoader classLoader) { - throw new UnsupportedOperationException(); + return SumAllInputOperator.class; } } }