diff --git a/flink-connectors/flink-connector-base/pom.xml b/flink-connectors/flink-connector-base/pom.xml index b91a86d74f98c788ef3795a7496ed8ba6b5c09eb..b103a91c9ce25c127cae5e490a8afb500596eec2 100644 --- a/flink-connectors/flink-connector-base/pom.xml +++ b/flink-connectors/flink-connector-base/pom.xml @@ -50,5 +50,12 @@ test test-jar + + + org.apache.flink + flink-test-utils_${scala.binary.version} + ${project.version} + test + diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/NoMoreSplitsEvent.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/NoMoreSplitsEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..af05ab94f8c9c26f3501826933e68e5ac8ac8c8c --- /dev/null +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/NoMoreSplitsEvent.java @@ -0,0 +1,33 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.flink.connector.base.source.event; + +import org.apache.flink.api.connector.source.SourceEvent; + +/** + * A source event sent from the SplitEnumerator to the SourceReader to indicate that no more + * splits will be assigned to the source reader anymore. So once the SplitReader finishes + * reading the currently assigned splits, they can exit. + */ +public class NoMoreSplitsEvent implements SourceEvent { + @Override + public String toString() { + return "[NoMoreSplitEvent]"; + } +} diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index 043b735f2061621099d351f85a8393252588e803..96d813f7e53162b248d5b7eb899df37dfd042397 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -24,6 +24,7 @@ import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SourceSplit; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent; import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; @@ -81,6 +82,9 @@ public abstract class SourceReaderBase splitIter; + /** Indicating whether the SourceReader will be assigned more splits or not.*/ + private boolean noMoreSplitsAssignment; + public SourceReaderBase( FutureNotifier futureNotifier, FutureCompletingBlockingQueue> elementsQueue, @@ -97,6 +101,7 @@ public abstract class SourceReaderBase(recordsWithSplitId); } - + // Process one record. if (splitIter.hasNext()) { // emit the record. - recordEmitter.emitRecord(splitIter.next(), sourceOutput, splitStates.get(splitIter.currentSplitId())); - } else { + E record = splitIter.next(); + recordEmitter.emitRecord(record, sourceOutput, splitStates.get(splitIter.currentSplitId())); + LOG.trace("Emitted record: {}", record); + } + // Do some cleanup if the all the records in the current splitIter have been processed. + if (!splitIter.hasNext()) { // First remove the state of the split. splitIter.finishedSplitIds().forEach(splitStates::remove); // Handle the finished splits. onSplitFinished(splitIter.finishedSplitIds()); + // Prepare the return status based on the availability of the next element. + status = finishedOrAvailableLater(); + } else { + // There are more records from the current splitIter. + status = Status.AVAILABLE_NOW; } - // Prepare the return status based on the availability of the next element. - status = elementsQueue.isEmpty() ? Status.AVAILABLE_LATER : Status.AVAILABLE_NOW; } + LOG.trace("Source reader status: {}", status); return status; } @@ -174,11 +187,16 @@ public abstract class SourceReaderBase implements Runnable { private FetchTask fetchTask; private volatile Thread runningThread; private volatile SplitFetcherTask runningTask = null; + private volatile boolean isIdle; SplitFetcher( int id, @@ -71,6 +72,7 @@ public class SplitFetcher implements Runnable { this.assignedSplits = new HashMap<>(); this.splitReader = splitReader; this.shutdownHook = shutdownHook; + this.isIdle = true; this.wakeUp = new AtomicBoolean(false); this.closed = new AtomicBoolean(false); } @@ -82,7 +84,12 @@ public class SplitFetcher implements Runnable { // Remove the split from the assignments if it is already done. runningThread = Thread.currentThread(); this.fetchTask = new FetchTask<>( - splitReader, elementsQueue, ids -> ids.forEach(assignedSplits::remove), runningThread); + splitReader, + elementsQueue, + ids -> { + ids.forEach(assignedSplits::remove); + updateIsIdle(); + }, runningThread); while (!closed.get()) { runOnce(); } @@ -158,6 +165,7 @@ public class SplitFetcher implements Runnable { */ public void addSplits(List splitsToAdd) { maybeEnqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, splitChanges, assignedSplits)); + updateIsIdle(); wakeUp(true); } @@ -184,7 +192,7 @@ public class SplitFetcher implements Runnable { * @return true if task queue is not empty, false otherwise. */ boolean isIdle() { - return taskQueue.isEmpty() && assignedSplits.isEmpty(); + return isIdle; } /** @@ -299,4 +307,8 @@ public class SplitFetcher implements Runnable { return name; } } + + private void updateIsIdle() { + isIdle = taskQueue.isEmpty() && splitChanges.isEmpty() && assignedSplits.isEmpty(); + } } diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java index 6c88b132983ba563825c64d19ce614545a0d8a66..3b04bac570e21ccd782f7aecc3e991b73872919e 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java @@ -29,6 +29,7 @@ import org.apache.flink.util.ThrowableCatchingRunnable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; @@ -140,6 +141,24 @@ public abstract class SplitFetcherManager { return splitFetcher; } + /** + * Check and shutdown the fetchers that have completed their work. + * + * @return true if all the fetchers have completed the work, false otherwise. + */ + public boolean maybeShutdownFinishedFetchers() { + Iterator>> iter = fetchers.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry> entry = iter.next(); + SplitFetcher fetcher = entry.getValue(); + if (fetcher.isIdle()) { + fetcher.shutdown(); + iter.remove(); + } + } + return fetchers.isEmpty(); + } + /** * Close the split fetcher manager. * diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java new file mode 100644 index 0000000000000000000000000000000000000000..1227e9f7066cd8c7016e695359a7adfeee6efd08 --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/CoordinatedSourceITCase.java @@ -0,0 +1,81 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.flink.connector.base.source.reader; + +import org.apache.flink.api.common.accumulators.ListAccumulator; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.Test; + +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * IT case for the {@link Source} with a coordinator. + */ +public class CoordinatedSourceITCase extends AbstractTestBase { + + @Test + public void testEnumeratorReaderCommunication() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + MockBaseSource source = new MockBaseSource(2, 10, Boundedness.BOUNDED); + DataStream stream = env.continuousSource(source, "TestingSource"); + executeAndVerify(env, stream, 20); + } + + @Test + public void testMultipleSources() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + MockBaseSource source1 = new MockBaseSource(2, 10, Boundedness.BOUNDED); + MockBaseSource source2 = new MockBaseSource(2, 10, 20, Boundedness.BOUNDED); + DataStream stream1 = env.continuousSource(source1, "TestingSource1"); + DataStream stream2 = env.continuousSource(source2, "TestingSource2"); + executeAndVerify(env, stream1.union(stream2), 40); + } + + @SuppressWarnings("serial") + private void executeAndVerify(StreamExecutionEnvironment env, DataStream stream, int numRecords) throws Exception { + stream.addSink(new RichSinkFunction() { + @Override + public void open(Configuration parameters) throws Exception { + getRuntimeContext().addAccumulator("result", new ListAccumulator()); + } + + @Override + public void invoke(Integer value, Context context) throws Exception { + getRuntimeContext().getAccumulator("result").add(value); + } + }); + List result = env.execute().getAccumulatorResult("result"); + Collections.sort(result); + assertEquals(numRecords, result.size()); + assertEquals(0, (int) result.get(0)); + assertEquals(numRecords - 1, (int) result.get(result.size() - 1)); + } + +} diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java index fc1f643cd17bb853945d2a6024d444f6e1140c9e..9f74a4560a9ffc4c0b0d6bbf595b7d3a2d3d5b52 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java @@ -127,7 +127,7 @@ public abstract class SourceReaderTestBase extends T public void testSnapshot() throws Exception { ValidatingSourceOutput output = new ValidatingSourceOutput(); // Add a split to start the fetcher. - List splits = getSplits(NUM_SPLITS, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED); + List splits = getSplits(NUM_SPLITS, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED); // Poll 5 records. That means split 0 and 1 will at index 2, split 1 will at index 1. try (SourceReader reader = consumeRecords(splits, output, NUM_SPLITS * NUM_RECORDS_PER_SPLIT)) { diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java new file mode 100644 index 0000000000000000000000000000000000000000..137afd9bafe7064aeaad4cf60032ef29b7f32262 --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java @@ -0,0 +1,138 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.flink.connector.base.source.reader.mocks; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; +import org.apache.flink.connector.base.source.reader.SourceReaderOptions; +import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue; +import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.InstantiationUtil; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +/** + * A {@link Source} class for unit test of base implementation of source connector. + */ +public class MockBaseSource implements Source> { + private static final long serialVersionUID = 4445067705639284175L; + + private final int numSplits; + private final int numRecordsPerSplit; + private final int startingValue; + private final Boundedness boundedness; + + public MockBaseSource(int numSplits, int numRecordsPerSplit, Boundedness boundedness) { + this(numSplits, numRecordsPerSplit, 0, boundedness); + } + + public MockBaseSource(int numSplits, int numRecordsPerSplit, int startingValue, Boundedness boundedness) { + this.numSplits = numSplits; + this.numRecordsPerSplit = numRecordsPerSplit; + this.startingValue = startingValue; + this.boundedness = boundedness; + } + + @Override + public Boundedness getBoundedness() { + return boundedness; + } + + @Override + public SourceReader createReader(SourceReaderContext readerContext) { + FutureNotifier futureNotifier = new FutureNotifier(); + FutureCompletingBlockingQueue> elementsQueue = + new FutureCompletingBlockingQueue<>(futureNotifier); + + Configuration config = new Configuration(); + config.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1); + config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L); + return new MockSourceReader( + futureNotifier, + elementsQueue, + () -> new MockSplitReader(2, true, true), + config, + readerContext); + } + + @Override + public SplitEnumerator> createEnumerator( + SplitEnumeratorContext enumContext) { + List splits = new ArrayList<>(); + for (int i = 0; i < numSplits; i++) { + int endIndex = boundedness == Boundedness.BOUNDED ? numRecordsPerSplit : Integer.MAX_VALUE; + MockSourceSplit split = new MockSourceSplit(i, 0, endIndex); + for (int j = 0; j < numRecordsPerSplit; j++) { + split.addRecord(startingValue + i * numRecordsPerSplit + j); + } + splits.add(split); + } + return new MockSplitEnumerator(splits, enumContext); + } + + @Override + public SplitEnumerator> restoreEnumerator( + SplitEnumeratorContext enumContext, + List checkpoint) throws IOException { + return new MockSplitEnumerator(checkpoint, enumContext); + } + + @Override + public SimpleVersionedSerializer getSplitSerializer() { + return new MockSourceSplitSerializer(); + } + + @Override + public SimpleVersionedSerializer> getEnumeratorCheckpointSerializer() { + return new SimpleVersionedSerializer>() { + @Override + public int getVersion() { + return 0; + } + + @Override + public byte[] serialize(List obj) throws IOException { + return InstantiationUtil.serializeObject(obj.toArray()); + } + + @Override + public List deserialize(int version, byte[] serialized) throws IOException { + MockSourceSplit[] splitArray; + try { + splitArray = InstantiationUtil.deserializeObject(serialized, getClass().getClassLoader()); + } catch (ClassNotFoundException e) { + throw new IOException("Failed to deserialize the source split."); + } + return new ArrayList<>(Arrays.asList(splitArray)); + } + }; + } +} diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java new file mode 100644 index 0000000000000000000000000000000000000000..b3334a2431d4eddf7e55c2dc065198f10a99f26b --- /dev/null +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java @@ -0,0 +1,90 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.flink.connector.base.source.reader.mocks; + +import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * A mock {@link SplitEnumerator} for unit tests. + */ +public class MockSplitEnumerator implements SplitEnumerator> { + private final List splits; + private final SplitEnumeratorContext context; + + public MockSplitEnumerator( + List splits, + SplitEnumeratorContext context) { + this.splits = splits; + this.context = context; + } + + @Override + public void start() { + + } + + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + + } + + @Override + public void addSplitsBack(List splits, int subtaskId) { + this.splits.addAll(splits); + } + + @Override + public void addReader(int subtaskId) { + if (context.registeredReaders().size() == context.currentParallelism()) { + int numReaders = context.registeredReaders().size(); + Map> assignment = new HashMap<>(); + for (int i = 0; i < splits.size(); i++) { + assignment + .computeIfAbsent(i % numReaders, t -> new ArrayList<>()) + .add(splits.get(i)); + } + context.assignSplits(new SplitsAssignment<>(assignment)); + splits.clear(); + for (int i = 0; i < numReaders; i++) { + context.sendEventToSourceReader(i, new NoMoreSplitsEvent()); + } + } + } + + @Override + public List snapshotState() { + return splits; + } + + @Override + public void close() throws IOException { + + } +} diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java index 7513e08aae25c71edf89d53d09d887ce9009b9de..e0edabd3840fc1376e903d476b988cacd26db127 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java @@ -90,11 +90,11 @@ public class MockSplitReader implements SplitReader { int[] record = split.getNext(blockingFetch); if (record != null) { records.add(entry.getKey(), record); - if (split.isFinished()) { - records.addFinishedSplit(entry.getKey()); - } } } + if (split.isFinished()) { + records.addFinishedSplit(entry.getKey()); + } } } catch (InterruptedException ie) { // Catch the exception and return the records that are already read. diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java index 24d1bee9b69192125a0c3239e2bffeffdf0aa42c..dedc45e5a95552b4d22c75707e1de44e41f1d22c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java @@ -20,7 +20,6 @@ package org.apache.flink.api.connector.source; import org.apache.flink.annotation.Public; -import java.io.Serializable; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -32,7 +31,7 @@ import java.util.concurrent.CompletableFuture; * @param The type of the the source splits. */ @Public -public interface SourceReader extends Serializable, AutoCloseable { +public interface SourceReader extends AutoCloseable { /** * Start the reader. diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py index b114ef4c3837e28da025cda09526aa7ed3983f99..c91e086701cfec3c59877ffdfe59a3ad9489e117 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py @@ -49,7 +49,7 @@ class StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase, 'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection', 'socketTextStream', 'initializeContextEnvironment', 'readTextFile', 'addSource', 'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener', - 'clearJobListeners', 'getJobListeners'} + 'clearJobListeners', 'getJobListeners', "continuousSource"} if __name__ == '__main__': diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java index a6fbac84cc27328d2bcbcec6940d980b8b2b80a5..4e0d626612c9d2e2f9c4eaeb7e06d0877e3a6d56 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamSource.java @@ -20,8 +20,11 @@ package org.apache.flink.streaming.api.datastream; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.operators.util.OperatorValidationUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Source; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.SourceOperatorFactory; import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.streaming.api.transformations.SourceTransformation; /** @@ -37,7 +40,7 @@ public class DataStreamSource extends SingleOutputStreamOperator { public DataStreamSource(StreamExecutionEnvironment environment, TypeInformation outTypeInfo, StreamSource operator, boolean isParallel, String sourceName) { - super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism())); + super(environment, new LegacySourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism())); this.isParallel = isParallel; if (!isParallel) { @@ -50,6 +53,19 @@ public class DataStreamSource extends SingleOutputStreamOperator { this.isParallel = true; } + public DataStreamSource( + StreamExecutionEnvironment environment, + Source source, + TypeInformation outTypeInfo, + String sourceName) { + super(environment, + new SourceTransformation<>( + sourceName, + new SourceOperatorFactory<>(source), + outTypeInfo, + environment.getParallelism())); + } + @Override public DataStreamSource setParallelism(int parallelism) { OperatorValidationUtils.validateParallelism(parallelism, isParallel); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 386efc78bf3dd165845269d85b0d5c0cbe1802fd..680e285d9a53d2d02676b3c7ba5da1017594887d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -32,6 +32,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.ClosureCleaner; import org.apache.flink.api.java.Utils; @@ -1591,25 +1592,52 @@ public class StreamExecutionEnvironment { @SuppressWarnings("unchecked") public DataStreamSource addSource(SourceFunction function, String sourceName, TypeInformation typeInfo) { - if (function instanceof ResultTypeQueryable) { - typeInfo = ((ResultTypeQueryable) function).getProducedType(); - } - if (typeInfo == null) { - try { - typeInfo = TypeExtractor.createTypeInfo( - SourceFunction.class, - function.getClass(), 0, null, null); - } catch (final InvalidTypesException e) { - typeInfo = (TypeInformation) new MissingTypeInfo(sourceName, e); - } - } + TypeInformation resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo); boolean isParallel = function instanceof ParallelSourceFunction; clean(function); final StreamSource sourceOperator = new StreamSource<>(function); - return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName); + return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName); + } + + /** + * Add a data {@link Source} to the environment to get a {@link DataStream}. + * + * @param source + * the user defined source + * @param sourceName + * Name of the data source + * @param + * type of the returned stream + * @return the data stream constructed + */ + @PublicEvolving + public DataStreamSource continuousSource(Source source, String sourceName) { + return continuousSource(source, sourceName, null); + } + + /** + * Add a data {@link Source} to the environment to get a {@link DataStream}. + * + * @param source + * the user defined source + * @param sourceName + * Name of the data source + * @param + * type of the returned stream + * @param typeInfo + * the user defined type information for the stream + * @return the data stream constructed + */ + @PublicEvolving + public DataStreamSource continuousSource( + Source source, + String sourceName, + TypeInformation typeInfo) { + TypeInformation resolvedTypeInfo = getTypeInfo(source, sourceName, Source.class, typeInfo); + return new DataStreamSource<>(this, source, resolvedTypeInfo, sourceName); } /** @@ -2106,4 +2134,27 @@ public class StreamExecutionEnvironment { public void registerCachedFile(String filePath, String name, boolean executable) { this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable))); } + + // Private helpers. + @SuppressWarnings("unchecked") + private > T getTypeInfo( + Object source, + String sourceName, + Class baseSourceClass, + TypeInformation typeInfo) { + TypeInformation resolvedTypeInfo = typeInfo; + if (source instanceof ResultTypeQueryable) { + resolvedTypeInfo = ((ResultTypeQueryable) source).getProducedType(); + } + if (resolvedTypeInfo == null) { + try { + resolvedTypeInfo = TypeExtractor.createTypeInfo( + baseSourceClass, + source.getClass(), 0, null, null); + } catch (final InvalidTypesException e) { + resolvedTypeInfo = (TypeInformation) new MissingTypeInfo(sourceName, e); + } + } + return (T) resolvedTypeInfo; + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 83d9acaaa0ba78f069266ea339dd186190c99076..7df5c2836093ea969f55eaa93478a87659bc17d3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.operators.SourceOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.transformations.ShuffleMode; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; @@ -47,6 +48,7 @@ import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; import org.apache.flink.streaming.runtime.tasks.SourceStreamTask; import org.apache.flink.streaming.runtime.tasks.StreamIterationHead; import org.apache.flink.streaming.runtime.tasks.StreamIterationTail; @@ -234,24 +236,46 @@ public class StreamGraph implements Pipeline { return !vertexIDtoLoopTimeout.isEmpty(); } - public void addSource(Integer vertexID, - @Nullable String slotSharingGroup, - @Nullable String coLocationGroup, - StreamOperatorFactory operatorFactory, - TypeInformation inTypeInfo, - TypeInformation outTypeInfo, - String operatorName) { + public void addSource( + Integer vertexID, + @Nullable String slotSharingGroup, + @Nullable String coLocationGroup, + SourceOperatorFactory operatorFactory, + TypeInformation inTypeInfo, + TypeInformation outTypeInfo, + String operatorName) { + addOperator( + vertexID, + slotSharingGroup, + coLocationGroup, + operatorFactory, + inTypeInfo, + outTypeInfo, + operatorName, + SourceOperatorStreamTask.class); + sources.add(vertexID); + } + + public void addLegacySource( + Integer vertexID, + @Nullable String slotSharingGroup, + @Nullable String coLocationGroup, + StreamOperatorFactory operatorFactory, + TypeInformation inTypeInfo, + TypeInformation outTypeInfo, + String operatorName) { addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, outTypeInfo, operatorName); sources.add(vertexID); } - public void addSink(Integer vertexID, - @Nullable String slotSharingGroup, - @Nullable String coLocationGroup, - StreamOperatorFactory operatorFactory, - TypeInformation inTypeInfo, - TypeInformation outTypeInfo, - String operatorName) { + public void addSink( + Integer vertexID, + @Nullable String slotSharingGroup, + @Nullable String coLocationGroup, + StreamOperatorFactory operatorFactory, + TypeInformation inTypeInfo, + TypeInformation outTypeInfo, + String operatorName) { addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, outTypeInfo, operatorName); sinks.add(vertexID); } @@ -264,13 +288,23 @@ public class StreamGraph implements Pipeline { TypeInformation inTypeInfo, TypeInformation outTypeInfo, String operatorName) { + Class invokableClass = + operatorFactory.isStreamSource() ? SourceStreamTask.class : OneInputStreamTask.class; + addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, + outTypeInfo, operatorName, invokableClass); + } - if (operatorFactory.isStreamSource()) { - addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorFactory, operatorName); - } else { - addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorFactory, operatorName); - } + private void addOperator( + Integer vertexID, + @Nullable String slotSharingGroup, + @Nullable String coLocationGroup, + StreamOperatorFactory operatorFactory, + TypeInformation inTypeInfo, + TypeInformation outTypeInfo, + String operatorName, + Class invokableClass) { + addNode(vertexID, slotSharingGroup, coLocationGroup, invokableClass, operatorFactory, operatorName); setSerializers(vertexID, createSerializer(inTypeInfo), null, createSerializer(outTypeInfo)); if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) { @@ -340,25 +374,26 @@ public class StreamGraph implements Pipeline { } } - protected StreamNode addNode(Integer vertexID, - @Nullable String slotSharingGroup, - @Nullable String coLocationGroup, - Class vertexClass, - StreamOperatorFactory operatorFactory, - String operatorName) { + protected StreamNode addNode( + Integer vertexID, + @Nullable String slotSharingGroup, + @Nullable String coLocationGroup, + Class vertexClass, + StreamOperatorFactory operatorFactory, + String operatorName) { if (streamNodes.containsKey(vertexID)) { throw new RuntimeException("Duplicate vertexID " + vertexID); } StreamNode vertex = new StreamNode( - vertexID, - slotSharingGroup, - coLocationGroup, - operatorFactory, - operatorName, - new ArrayList>(), - vertexClass); + vertexID, + slotSharingGroup, + coLocationGroup, + operatorFactory, + operatorName, + new ArrayList>(), + vertexClass); streamNodes.put(vertexID, vertex); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index 4cf427b1bf6f9dcbdc2f52474a0c5f95c9d29a11..c9060b987a1f0c1ed2c317777b27ebebd337c383 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.transformations.AbstractMultipleInputTrans import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation; import org.apache.flink.streaming.api.transformations.FeedbackTransformation; import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation; +import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.transformations.PhysicalTransformation; @@ -254,8 +255,10 @@ public class StreamGraphGenerator { transformedIds = transformTwoInputTransform((TwoInputTransformation) transform); } else if (transform instanceof AbstractMultipleInputTransformation) { transformedIds = transformMultipleInputTransform((AbstractMultipleInputTransformation) transform); - } else if (transform instanceof SourceTransformation) { + } else if (transform instanceof SourceTransformation) { transformedIds = transformSource((SourceTransformation) transform); + } else if (transform instanceof LegacySourceTransformation) { + transformedIds = transformLegacySource((LegacySourceTransformation) transform); } else if (transform instanceof SinkTransformation) { transformedIds = transformSink((SinkTransformation) transform); } else if (transform instanceof UnionTransformation) { @@ -584,6 +587,26 @@ public class StreamGraphGenerator { null, source.getOutputType(), "Source: " + source.getName()); + int parallelism = source.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ? + source.getParallelism() : executionConfig.getParallelism(); + streamGraph.setParallelism(source.getId(), parallelism); + streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism()); + return Collections.singleton(source.getId()); + } + + /** + * Transforms a {@code LegacySourceTransformation}. + */ + private Collection transformLegacySource(LegacySourceTransformation source) { + String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList()); + + streamGraph.addLegacySource(source.getId(), + slotSharingGroup, + source.getCoLocationGroupKey(), + source.getOperatorFactory(), + null, + source.getOutputType(), + "Source: " + source.getName()); if (source.getOperatorFactory() instanceof InputFormatOperatorFactory) { streamGraph.setInputFormat(source.getId(), ((InputFormatOperatorFactory) source.getOperatorFactory()).getInputFormat()); @@ -596,7 +619,7 @@ public class StreamGraphGenerator { } /** - * Transforms a {@code SourceTransformation}. + * Transforms a {@code SinkTransformation}. */ private Collection transformSink(SinkTransformation sink) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index 8df1206d7555a2b108c27f69a033057eeb887dd8..8f25d6c7a61528dc0ca6471c0495740389cd25cf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -25,8 +25,11 @@ import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.streaming.api.collector.selector.OutputSelector; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; @@ -36,6 +39,7 @@ import javax.annotation.Nullable; import java.io.Serializable; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import static org.apache.flink.util.Preconditions.checkArgument; @@ -94,14 +98,13 @@ public class StreamNode implements Serializable { } public StreamNode( - Integer id, - @Nullable String slotSharingGroup, - @Nullable String coLocationGroup, - StreamOperatorFactory operatorFactory, - String operatorName, - List> outputSelector, - Class jobVertexClass) { - + Integer id, + @Nullable String slotSharingGroup, + @Nullable String coLocationGroup, + StreamOperatorFactory operatorFactory, + String operatorName, + List> outputSelector, + Class jobVertexClass) { this.id = id; this.operatorName = operatorName; this.operatorFactory = operatorFactory; @@ -336,6 +339,17 @@ public class StreamNode implements Serializable { this.userHash = userHash; } + public Optional getCoordinatorProvider( + String operatorName, + OperatorID operatorID) { + if (operatorFactory instanceof CoordinatedOperatorFactory) { + return Optional.of(((CoordinatedOperatorFactory) operatorFactory) + .getCoordinatorProvider(operatorName, operatorID)); + } else { + return Optional.empty(); + } + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 7f093a0947962eb4f572b9acb97cf3add8f4b177..9022eaa9eb76ad87bdfbb89cd6b3040653c09929 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -45,6 +45,7 @@ import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalPipelinedRegion; import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.streaming.api.CheckpointingMode; @@ -165,9 +166,7 @@ public class StreamingJobGraphGenerator { legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph)); } - Map>> chainedOperatorHashes = new HashMap<>(); - - setChaining(hashes, legacyHashes, chainedOperatorHashes); + setChaining(hashes, legacyHashes); setPhysicalEdges(); @@ -251,20 +250,17 @@ public class StreamingJobGraphGenerator { * *

This will recursively create all {@link JobVertex} instances. */ - private void setChaining(Map hashes, List> legacyHashes, Map>> chainedOperatorHashes) { + private void setChaining(Map hashes, List> legacyHashes) { for (Integer sourceNodeId : streamGraph.getSourceIDs()) { - createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes); + createChain( + sourceNodeId, + 0, + new OperatorChainInfo(sourceNodeId, hashes, legacyHashes, streamGraph)); } } - private List createChain( - Integer startNodeId, - Integer currentNodeId, - Map hashes, - List> legacyHashes, - int chainIndex, - Map>> chainedOperatorHashes) { - + private List createChain(Integer currentNodeId, int chainIndex, OperatorChainInfo chainInfo) { + Integer startNodeId = chainInfo.getStartNodeId(); if (!builtVertices.contains(startNodeId)) { List transitiveOutEdges = new ArrayList(); @@ -284,28 +280,20 @@ public class StreamingJobGraphGenerator { for (StreamEdge chainable : chainableOutputs) { transitiveOutEdges.addAll( - createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes)); + createChain(chainable.getTargetId(), chainIndex + 1, chainInfo)); } for (StreamEdge nonChainable : nonChainableOutputs) { transitiveOutEdges.add(nonChainable); - createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes); - } - - List> operatorHashes = - chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>()); - - byte[] primaryHashBytes = hashes.get(currentNodeId); - OperatorID currentOperatorId = new OperatorID(primaryHashBytes); - - for (Map legacyHash : legacyHashes) { - operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId))); + createChain(nonChainable.getTargetId(), 0, chainInfo.newChain(nonChainable.getTargetId())); } chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs)); chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs)); chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs)); + OperatorID currentOperatorId = chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId)); + if (currentNode.getInputFormat() != null) { getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat()); } @@ -315,7 +303,7 @@ public class StreamingJobGraphGenerator { } StreamConfig config = currentNodeId.equals(startNodeId) - ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes) + ? createJobVertex(startNodeId, chainInfo) : new StreamConfig(new Configuration()); setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs); @@ -393,14 +381,12 @@ public class StreamingJobGraphGenerator { private StreamConfig createJobVertex( Integer streamNodeId, - Map hashes, - List> legacyHashes, - Map>> chainedOperatorHashes) { + OperatorChainInfo chainInfo) { JobVertex jobVertex; StreamNode streamNode = streamGraph.getStreamNode(streamNodeId); - byte[] hash = hashes.get(streamNodeId); + byte[] hash = chainInfo.getHash(streamNodeId); if (hash == null) { throw new IllegalStateException("Cannot find node hash. " + @@ -409,7 +395,7 @@ public class StreamingJobGraphGenerator { JobVertexID jobVertexId = new JobVertexID(hash); - List> chainedOperators = chainedOperatorHashes.get(streamNodeId); + List> chainedOperators = chainInfo.getChainedOperatorHashes(streamNodeId); List operatorIDPairs = new ArrayList<>(); if (chainedOperators != null) { for (Tuple2 chainedOperator : chainedOperators) { @@ -434,6 +420,15 @@ public class StreamingJobGraphGenerator { operatorIDPairs); } + for (OperatorCoordinator.Provider coordinatorProvider : chainInfo.getCoordinatorProviders()) { + try { + jobVertex.addOperatorCoordinator(new SerializedValue<>(coordinatorProvider)); + } catch (IOException e) { + throw new FlinkRuntimeException(String.format( + "Coordinator Provider for node %s is not serializable.", chainedNames.get(streamNodeId))); + } + } + jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId)); jobVertex.setInvokableClass(streamNode.getJobVertexClass()); @@ -973,4 +968,67 @@ public class StreamingJobGraphGenerator { jobGraph.setSnapshotSettings(settings); } + + /** + * A private class to help maintain the information of an operator chain during the recursive call in + * {@link #createChain(Integer, int, OperatorChainInfo)}. + */ + private static class OperatorChainInfo { + private final Integer startNodeId; + private final Map hashes; + private final List> legacyHashes; + private final Map>> chainedOperatorHashes; + private final List coordinatorProviders; + private final StreamGraph streamGraph; + + private OperatorChainInfo( + int startNodeId, + Map hashes, + List> legacyHashes, + StreamGraph streamGraph) { + this.startNodeId = startNodeId; + this.hashes = hashes; + this.legacyHashes = legacyHashes; + this.chainedOperatorHashes = new HashMap<>(); + this.coordinatorProviders = new ArrayList<>(); + this.streamGraph = streamGraph; + } + + byte[] getHash(Integer streamNodeId) { + return hashes.get(streamNodeId); + } + + private Integer getStartNodeId() { + return startNodeId; + } + + private List> getChainedOperatorHashes(int startNodeId) { + return chainedOperatorHashes.get(startNodeId); + } + + private List getCoordinatorProviders() { + return coordinatorProviders; + } + + private OperatorID addNodeToChain(int currentNodeId, String operatorName) { + List> operatorHashes = + chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>()); + + byte[] primaryHashBytes = hashes.get(currentNodeId); + + for (Map legacyHash : legacyHashes) { + operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId))); + } + + streamGraph + .getStreamNode(currentNodeId) + .getCoordinatorProvider(operatorName, new OperatorID(getHash(currentNodeId))) + .map(coordinatorProviders::add); + return new OperatorID(primaryHashBytes); + } + + private OperatorChainInfo newChain(Integer startNodeId) { + return new OperatorChainInfo(startNodeId, hashes, legacyHashes, streamGraph); + } + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index e4971d2cc9996b59b76d0d725da2b5d7ccaea5b0..d8907ac0d662a28aabb919f476389ff1f81f46b6 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -64,6 +64,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; public class SourceOperator extends AbstractStreamOperator implements OperatorEventHandler, PushingAsyncDataInput { + private static final long serialVersionUID = 1405537676017904695L; // Package private for unit test. static final ListStateDescriptor SPLITS_STATE_DESC = diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java new file mode 100644 index 0000000000000000000000000000000000000000..f43919af92cdfa622d3df446441c18952e8ef0b8 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/LegacySourceTransformation.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.transformations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamSource; + +import java.util.Collection; +import java.util.Collections; + +/** + * This represents a Source. This does not actually transform anything since it has no inputs but + * it is the root {@code Transformation} of any topology. + * + * @param The type of the elements that this source produces + */ +@Internal +public class LegacySourceTransformation extends PhysicalTransformation { + + private final StreamOperatorFactory operatorFactory; + + /** + * Creates a new {@code LegacySourceTransformation} from the given operator. + * + * @param name The name of the {@code LegacySourceTransformation}, this will be shown in Visualizations and the Log + * @param operator The {@code StreamSource} that is the operator of this Transformation + * @param outputType The type of the elements produced by this {@code LegacySourceTransformation} + * @param parallelism The parallelism of this {@code LegacySourceTransformation} + */ + public LegacySourceTransformation( + String name, + StreamSource operator, + TypeInformation outputType, + int parallelism) { + this(name, SimpleOperatorFactory.of(operator), outputType, parallelism); + } + + public LegacySourceTransformation( + String name, + StreamOperatorFactory operatorFactory, + TypeInformation outputType, + int parallelism) { + super(name, outputType, parallelism); + this.operatorFactory = operatorFactory; + } + + @VisibleForTesting + public StreamSource getOperator() { + return (StreamSource) ((SimpleOperatorFactory) operatorFactory).getOperator(); + } + + /** + * Returns the {@code StreamOperatorFactory} of this {@code LegacySourceTransformation}. + */ + public StreamOperatorFactory getOperatorFactory() { + return operatorFactory; + } + + @Override + public Collection> getTransitivePredecessors() { + return Collections.singleton(this); + } + + @Override + public final void setChainingStrategy(ChainingStrategy strategy) { + operatorFactory.setChainingStrategy(strategy); + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java index 94cb93229978c10c630d1ce6a15926be179a5fca..2b82e2755c14c4533203783c894a124927487ae0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/SourceTransformation.java @@ -1,81 +1,62 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package org.apache.flink.streaming.api.transformations; import org.apache.flink.annotation.Internal; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Source; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; -import org.apache.flink.streaming.api.operators.StreamOperatorFactory; -import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.operators.SourceOperator; +import org.apache.flink.streaming.api.operators.SourceOperatorFactory; import java.util.Collection; import java.util.Collections; /** - * This represents a Source. This does not actually transform anything since it has no inputs but - * it is the root {@code Transformation} of any topology. - * - * @param The type of the elements that this source produces + * A {@link PhysicalTransformation} for {@link Source}. */ @Internal -public class SourceTransformation extends PhysicalTransformation { - - private final StreamOperatorFactory operatorFactory; - +public class SourceTransformation extends PhysicalTransformation { + private final SourceOperatorFactory sourceFactory; /** - * Creates a new {@code SourceTransformation} from the given operator. + * Creates a new {@code Transformation} with the given name, output type and parallelism. * - * @param name The name of the {@code SourceTransformation}, this will be shown in Visualizations and the Log - * @param operator The {@code StreamSource} that is the operator of this Transformation - * @param outputType The type of the elements produced by this {@code SourceTransformation} - * @param parallelism The parallelism of this {@code SourceTransformation} + * @param name The name of the {@code Transformation}, this will be shown in Visualizations and the Log + * @param sourceFactory The operator factory for {@link SourceOperator}. + * @param outputType The output type of this {@code Transformation} + * @param parallelism The parallelism of this {@code Transformation} */ public SourceTransformation( String name, - StreamSource operator, - TypeInformation outputType, - int parallelism) { - this(name, SimpleOperatorFactory.of(operator), outputType, parallelism); - } - - public SourceTransformation( - String name, - StreamOperatorFactory operatorFactory, - TypeInformation outputType, + SourceOperatorFactory sourceFactory, + TypeInformation outputType, int parallelism) { super(name, outputType, parallelism); - this.operatorFactory = operatorFactory; - } - - @VisibleForTesting - public StreamSource getOperator() { - return (StreamSource) ((SimpleOperatorFactory) operatorFactory).getOperator(); + this.sourceFactory = sourceFactory; } /** - * Returns the {@code StreamOperatorFactory} of this {@code SourceTransformation}. + * Returns the {@code StreamOperatorFactory} of this {@code LegacySourceTransformation}. */ - public StreamOperatorFactory getOperatorFactory() { - return operatorFactory; + public SourceOperatorFactory getOperatorFactory() { + return sourceFactory; } @Override @@ -85,6 +66,6 @@ public class SourceTransformation extends PhysicalTransformation { @Override public final void setChainingStrategy(ChainingStrategy strategy) { - operatorFactory.setChainingStrategy(strategy); + sourceFactory.setChainingStrategy(strategy); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index 3ee0002b0c24eecb097610a8a8711c3e73aa93f0..1fb977a9a46168dc81f4855f69061986a449cc6e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -28,6 +28,9 @@ import org.apache.flink.api.common.operators.ResourceSpec; import org.apache.flink.api.common.operators.util.UserCodeWrapper; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.mocks.MockSource; +import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.io.TypeSerializerInputFormat; import org.apache.flink.api.java.tuple.Tuple2; @@ -43,6 +46,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.util.TaskConfig; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; @@ -56,18 +60,27 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory; +import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory; import org.apache.flink.streaming.api.operators.MailboxExecutor; import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; +import org.apache.flink.streaming.api.operators.SourceOperatorFactory; import org.apache.flink.streaming.api.operators.StreamMap; +import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.api.operators.StreamOperatorParameters; import org.apache.flink.streaming.api.operators.YieldingOperatorFactory; +import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.transformations.ShuffleMode; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; +import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask; import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator; import org.apache.flink.util.Collector; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; @@ -234,6 +247,26 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { assertTrue(printConfig.isChainEnd()); } + @Test + public void testOperatorCoordinatorAddedToJobVertex() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream stream = + env.continuousSource(new MockSource(Boundedness.BOUNDED, 1), "TestingSource"); + + OneInputTransformation resultTransform = new OneInputTransformation( + stream.getTransformation(), + "AnyName", + new CoordinatedTransformOperatorFactory(), + BasicTypeInfo.INT_TYPE_INFO, + env.getParallelism()); + + new TestingSingleOutputStreamOperator<>(env, resultTransform).print(); + + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); + + assertEquals(2, jobGraph.getVerticesAsArray()[0].getOperatorCoordinators().size()); + } + /** * Verifies that the resources are merged correctly for chained operators (covers source and sink cases) * when generating job graph. @@ -422,6 +455,30 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { assertTrue(sinkFormat2 instanceof DiscardingOutputFormat); } + @Test + public void testCoordinatedOperator() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream source = + env.continuousSource(new MockSource(Boundedness.BOUNDED, 1), "TestSource"); + source.addSink(new DiscardingSink<>()); + + StreamGraph streamGraph = env.getStreamGraph(); + JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph); + // There should be only one job vertex. + assertEquals(1, jobGraph.getNumberOfVertices()); + + JobVertex jobVertex = jobGraph.getVerticesAsArray()[0]; + List> coordinatorProviders = jobVertex.getOperatorCoordinators(); + // There should be only one coordinator provider. + assertEquals(1, coordinatorProviders.size()); + // The invokable class should be SourceOperatorStreamTask. + final ClassLoader classLoader = getClass().getClassLoader(); + assertEquals(SourceOperatorStreamTask.class, jobVertex.getInvokableClass(classLoader)); + StreamOperatorFactory operatorFactory = + new StreamConfig(jobVertex.getConfiguration()).getStreamOperatorFactory(classLoader); + assertTrue(operatorFactory instanceof SourceOperatorFactory); + } + /** * Test setting shuffle mode to {@link ShuffleMode#PIPELINED}. */ @@ -890,4 +947,42 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { } } + // ------------ private classes ------------- + private static class CoordinatedTransformOperatorFactory + extends AbstractStreamOperatorFactory + implements CoordinatedOperatorFactory, OneInputStreamOperatorFactory { + + @Override + public OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) { + return new OperatorCoordinator.Provider() { + @Override + public OperatorID getOperatorId() { + return null; + } + + @Override + public OperatorCoordinator create(OperatorCoordinator.Context context) { + return null; + } + }; + } + + @Override + public > T createStreamOperator(StreamOperatorParameters parameters) { + return null; + } + + @Override + public Class getStreamOperatorClass(ClassLoader classLoader) { + return null; + } + } + + private static class TestingSingleOutputStreamOperator extends SingleOutputStreamOperator { + + public TestingSingleOutputStreamOperator(StreamExecutionEnvironment environment, + Transformation transformation) { + super(environment, transformation); + } + } } diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 0c7dd6c5d3f87300b068654429f58695e52348a7..37bb4f99c8f4849275f3d8123334a4d5437bc9b6 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -23,6 +23,7 @@ import org.apache.flink.annotation.{Internal, Public, PublicEvolving} import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat} import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.connector.source.{Source, SourceSplit} import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.scala.ClosureCleaner @@ -660,6 +661,15 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { addSource(sourceFunction) } + /** + * Create a DataStream using a [[Source]]. + */ + def continuousSource[T: TypeInformation]( + source: Source[T, _ <: SourceSplit, _], + sourceName: String): Unit = { + asScalaStream(javaEnv.continuousSource(source, sourceName)) + } + /** * Triggers the program execution. The environment will execute all parts of * the program that have resulted in a "sink" operation. Sink operations are diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala index 095add95cef24c14bfdf485da47e1b5dd7c1466e..e5020cc8266b8be7205ee3275a31a6864550764f 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamExecTableSourceScan.scala @@ -79,7 +79,7 @@ class StreamExecTableSourceScan( name: String, outTypeInfo: RowDataTypeInfo): Transformation[RowData] = { // It's better to use StreamExecutionEnvironment.createInput() - // rather than addSource() for streaming, because it take care of checkpoint. + // rather than addLegacySource() for streaming, because it take care of checkpoint. env .createInput(inputFormat, outTypeInfo) .name(name) diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/BatchExecutorTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/BatchExecutorTest.java index fe6b5c62df8b439f7a0ca3bcf9a417ed6157690a..7a1feb787673a835a07543ed42fc8cb31ebfadde 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/BatchExecutorTest.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/delegation/BatchExecutorTest.java @@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.streaming.api.transformations.LegacySourceTransformation; import org.apache.flink.table.api.TableConfig; import org.apache.flink.util.TestLogger; @@ -47,7 +47,7 @@ public class BatchExecutorTest extends TestLogger { public BatchExecutorTest() { batchExecutor = new BatchExecutor(LocalStreamEnvironment.getExecutionEnvironment()); - final Transformation testTransform = new SourceTransformation<>( + final Transformation testTransform = new LegacySourceTransformation<>( "MockTransform", new StreamSource<>(new SourceFunction() { @Override diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala index fa0add7a689ddcad0e4c67ff3d8c749f663ec3b1..cc1bae2cfdc4e54bbd29eb6a40ac62186b952808 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala @@ -366,7 +366,7 @@ class HarnessTestBase extends StreamingWithStateTestBase { } case union: UnionTransformation[_] => extractFromInputs(union.getInputs.toSeq: _*) case p: PartitionTransformation[_] => extractFromInputs(p.getInput) - case _: SourceTransformation[_] => null + case _: LegacySourceTransformation[_] => null case _ => throw new UnsupportedOperationException("This should not happen.") } }