提交 7abced6d 编写于 作者: J Jiangjie (Becket) Qin

[FLINK-15102][datastream/api] Add continuousSource() methods to the StreamExecutionEvironment.

The patch also adds a new SourceEventType of NoMoreSplitsEvent to allow the SourceReaderBase to exit after all the work is done.
上级 29a224db
......@@ -50,5 +50,12 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.flink.connector.base.source.event;
import org.apache.flink.api.connector.source.SourceEvent;
/**
* A source event sent from the SplitEnumerator to the SourceReader to indicate that no more
* splits will be assigned to the source reader anymore. So once the SplitReader finishes
* reading the currently assigned splits, they can exit.
*/
public class NoMoreSplitsEvent implements SourceEvent {
@Override
public String toString() {
return "[NoMoreSplitEvent]";
}
}
......@@ -24,6 +24,7 @@ import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
......@@ -81,6 +82,9 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
/** The last element to ensure it is fully handled. */
private SplitsRecordIterator<E> splitIter;
/** Indicating whether the SourceReader will be assigned more splits or not.*/
private boolean noMoreSplitsAssignment;
public SourceReaderBase(
FutureNotifier futureNotifier,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
......@@ -97,6 +101,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
this.options = new SourceReaderOptions(config);
this.config = config;
this.context = context;
this.noMoreSplitsAssignment = false;
}
@Override
......@@ -118,25 +123,33 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
Status status;
if (newFetch && recordsWithSplitId == null) {
// No element available, set to available later if needed.
status = Status.AVAILABLE_LATER;
status = finishedOrAvailableLater();
} else {
// Update the record iterator if it is a new fetch.
if (newFetch) {
splitIter = new SplitsRecordIterator<>(recordsWithSplitId);
}
// Process one record.
if (splitIter.hasNext()) {
// emit the record.
recordEmitter.emitRecord(splitIter.next(), sourceOutput, splitStates.get(splitIter.currentSplitId()));
} else {
E record = splitIter.next();
recordEmitter.emitRecord(record, sourceOutput, splitStates.get(splitIter.currentSplitId()));
LOG.trace("Emitted record: {}", record);
}
// Do some cleanup if the all the records in the current splitIter have been processed.
if (!splitIter.hasNext()) {
// First remove the state of the split.
splitIter.finishedSplitIds().forEach(splitStates::remove);
// Handle the finished splits.
onSplitFinished(splitIter.finishedSplitIds());
// Prepare the return status based on the availability of the next element.
status = finishedOrAvailableLater();
} else {
// There are more records from the current splitIter.
status = Status.AVAILABLE_NOW;
}
// Prepare the return status based on the availability of the next element.
status = elementsQueue.isEmpty() ? Status.AVAILABLE_LATER : Status.AVAILABLE_NOW;
}
LOG.trace("Source reader status: {}", status);
return status;
}
......@@ -174,11 +187,16 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
@Override
public void handleSourceEvents(SourceEvent sourceEvent) {
// Default action is do nothing.
LOG.trace("Handling source event: {}", sourceEvent);
if (sourceEvent instanceof NoMoreSplitsEvent) {
noMoreSplitsAssignment = true;
futureNotifier.notifyComplete();
}
}
@Override
public void close() throws Exception {
LOG.info("Closing Source Reader.");
splitFetcherManager.close(options.sourceReaderCloseTimeout);
}
......@@ -202,4 +220,16 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
* @return an immutable Split state.
*/
protected abstract SplitT toSplitType(String splitId, SplitStateT splitState);
// ------------------ private helper methods ---------------------
private Status finishedOrAvailableLater() {
boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();
boolean allElementsEmitted = elementsQueue.isEmpty() && (splitIter == null || !splitIter.hasNext());
if (noMoreSplitsAssignment && allFetchersHaveShutdown && allElementsEmitted) {
return Status.FINISHED;
} else {
return Status.AVAILABLE_LATER;
}
}
}
......@@ -57,6 +57,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
private FetchTask<E, SplitT> fetchTask;
private volatile Thread runningThread;
private volatile SplitFetcherTask runningTask = null;
private volatile boolean isIdle;
SplitFetcher(
int id,
......@@ -71,6 +72,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
this.assignedSplits = new HashMap<>();
this.splitReader = splitReader;
this.shutdownHook = shutdownHook;
this.isIdle = true;
this.wakeUp = new AtomicBoolean(false);
this.closed = new AtomicBoolean(false);
}
......@@ -82,7 +84,12 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
// Remove the split from the assignments if it is already done.
runningThread = Thread.currentThread();
this.fetchTask = new FetchTask<>(
splitReader, elementsQueue, ids -> ids.forEach(assignedSplits::remove), runningThread);
splitReader,
elementsQueue,
ids -> {
ids.forEach(assignedSplits::remove);
updateIsIdle();
}, runningThread);
while (!closed.get()) {
runOnce();
}
......@@ -158,6 +165,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
*/
public void addSplits(List<SplitT> splitsToAdd) {
maybeEnqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, splitChanges, assignedSplits));
updateIsIdle();
wakeUp(true);
}
......@@ -184,7 +192,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
* @return true if task queue is not empty, false otherwise.
*/
boolean isIdle() {
return taskQueue.isEmpty() && assignedSplits.isEmpty();
return isIdle;
}
/**
......@@ -299,4 +307,8 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
return name;
}
}
private void updateIsIdle() {
isIdle = taskQueue.isEmpty() && splitChanges.isEmpty() && assignedSplits.isEmpty();
}
}
......@@ -29,6 +29,7 @@ import org.apache.flink.util.ThrowableCatchingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
......@@ -140,6 +141,24 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
return splitFetcher;
}
/**
* Check and shutdown the fetchers that have completed their work.
*
* @return true if all the fetchers have completed the work, false otherwise.
*/
public boolean maybeShutdownFinishedFetchers() {
Iterator<Map.Entry<Integer, SplitFetcher<E, SplitT>>> iter = fetchers.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Integer, SplitFetcher<E, SplitT>> entry = iter.next();
SplitFetcher<E, SplitT> fetcher = entry.getValue();
if (fetcher.isIdle()) {
fetcher.shutdown();
iter.remove();
}
}
return fetchers.isEmpty();
}
/**
* Close the split fetcher manager.
*
......
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.flink.connector.base.source.reader;
import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.mocks.MockBaseSource;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Test;
import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
/**
* IT case for the {@link Source} with a coordinator.
*/
public class CoordinatedSourceITCase extends AbstractTestBase {
@Test
public void testEnumeratorReaderCommunication() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MockBaseSource source = new MockBaseSource(2, 10, Boundedness.BOUNDED);
DataStream<Integer> stream = env.continuousSource(source, "TestingSource");
executeAndVerify(env, stream, 20);
}
@Test
public void testMultipleSources() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
MockBaseSource source1 = new MockBaseSource(2, 10, Boundedness.BOUNDED);
MockBaseSource source2 = new MockBaseSource(2, 10, 20, Boundedness.BOUNDED);
DataStream<Integer> stream1 = env.continuousSource(source1, "TestingSource1");
DataStream<Integer> stream2 = env.continuousSource(source2, "TestingSource2");
executeAndVerify(env, stream1.union(stream2), 40);
}
@SuppressWarnings("serial")
private void executeAndVerify(StreamExecutionEnvironment env, DataStream<Integer> stream, int numRecords) throws Exception {
stream.addSink(new RichSinkFunction<Integer>() {
@Override
public void open(Configuration parameters) throws Exception {
getRuntimeContext().addAccumulator("result", new ListAccumulator<Integer>());
}
@Override
public void invoke(Integer value, Context context) throws Exception {
getRuntimeContext().getAccumulator("result").add(value);
}
});
List<Integer> result = env.execute().getAccumulatorResult("result");
Collections.sort(result);
assertEquals(numRecords, result.size());
assertEquals(0, (int) result.get(0));
assertEquals(numRecords - 1, (int) result.get(result.size() - 1));
}
}
......@@ -127,7 +127,7 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T
public void testSnapshot() throws Exception {
ValidatingSourceOutput output = new ValidatingSourceOutput();
// Add a split to start the fetcher.
List<SplitT> splits = getSplits(NUM_SPLITS, NUM_RECORDS_PER_SPLIT, Boundedness.BOUNDED);
List<SplitT> splits = getSplits(NUM_SPLITS, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED);
// Poll 5 records. That means split 0 and 1 will at index 2, split 1 will at index 1.
try (SourceReader<Integer, SplitT> reader =
consumeRecords(splits, output, NUM_SPLITS * NUM_RECORDS_PER_SPLIT)) {
......
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.flink.connector.base.source.reader.mocks;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.InstantiationUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* A {@link Source} class for unit test of base implementation of source connector.
*/
public class MockBaseSource implements Source<Integer, MockSourceSplit, List<MockSourceSplit>> {
private static final long serialVersionUID = 4445067705639284175L;
private final int numSplits;
private final int numRecordsPerSplit;
private final int startingValue;
private final Boundedness boundedness;
public MockBaseSource(int numSplits, int numRecordsPerSplit, Boundedness boundedness) {
this(numSplits, numRecordsPerSplit, 0, boundedness);
}
public MockBaseSource(int numSplits, int numRecordsPerSplit, int startingValue, Boundedness boundedness) {
this.numSplits = numSplits;
this.numRecordsPerSplit = numRecordsPerSplit;
this.startingValue = startingValue;
this.boundedness = boundedness;
}
@Override
public Boundedness getBoundedness() {
return boundedness;
}
@Override
public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) {
FutureNotifier futureNotifier = new FutureNotifier();
FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
new FutureCompletingBlockingQueue<>(futureNotifier);
Configuration config = new Configuration();
config.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L);
return new MockSourceReader(
futureNotifier,
elementsQueue,
() -> new MockSplitReader(2, true, true),
config,
readerContext);
}
@Override
public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> createEnumerator(
SplitEnumeratorContext<MockSourceSplit> enumContext) {
List<MockSourceSplit> splits = new ArrayList<>();
for (int i = 0; i < numSplits; i++) {
int endIndex = boundedness == Boundedness.BOUNDED ? numRecordsPerSplit : Integer.MAX_VALUE;
MockSourceSplit split = new MockSourceSplit(i, 0, endIndex);
for (int j = 0; j < numRecordsPerSplit; j++) {
split.addRecord(startingValue + i * numRecordsPerSplit + j);
}
splits.add(split);
}
return new MockSplitEnumerator(splits, enumContext);
}
@Override
public SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> restoreEnumerator(
SplitEnumeratorContext<MockSourceSplit> enumContext,
List<MockSourceSplit> checkpoint) throws IOException {
return new MockSplitEnumerator(checkpoint, enumContext);
}
@Override
public SimpleVersionedSerializer<MockSourceSplit> getSplitSerializer() {
return new MockSourceSplitSerializer();
}
@Override
public SimpleVersionedSerializer<List<MockSourceSplit>> getEnumeratorCheckpointSerializer() {
return new SimpleVersionedSerializer<List<MockSourceSplit>>() {
@Override
public int getVersion() {
return 0;
}
@Override
public byte[] serialize(List<MockSourceSplit> obj) throws IOException {
return InstantiationUtil.serializeObject(obj.toArray());
}
@Override
public List<MockSourceSplit> deserialize(int version, byte[] serialized) throws IOException {
MockSourceSplit[] splitArray;
try {
splitArray = InstantiationUtil.deserializeObject(serialized, getClass().getClassLoader());
} catch (ClassNotFoundException e) {
throw new IOException("Failed to deserialize the source split.");
}
return new ArrayList<>(Arrays.asList(splitArray));
}
};
}
}
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.flink.connector.base.source.reader.mocks;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* A mock {@link SplitEnumerator} for unit tests.
*/
public class MockSplitEnumerator implements SplitEnumerator<MockSourceSplit, List<MockSourceSplit>> {
private final List<MockSourceSplit> splits;
private final SplitEnumeratorContext<MockSourceSplit> context;
public MockSplitEnumerator(
List<MockSourceSplit> splits,
SplitEnumeratorContext<MockSourceSplit> context) {
this.splits = splits;
this.context = context;
}
@Override
public void start() {
}
@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
}
@Override
public void addSplitsBack(List<MockSourceSplit> splits, int subtaskId) {
this.splits.addAll(splits);
}
@Override
public void addReader(int subtaskId) {
if (context.registeredReaders().size() == context.currentParallelism()) {
int numReaders = context.registeredReaders().size();
Map<Integer, List<MockSourceSplit>> assignment = new HashMap<>();
for (int i = 0; i < splits.size(); i++) {
assignment
.computeIfAbsent(i % numReaders, t -> new ArrayList<>())
.add(splits.get(i));
}
context.assignSplits(new SplitsAssignment<>(assignment));
splits.clear();
for (int i = 0; i < numReaders; i++) {
context.sendEventToSourceReader(i, new NoMoreSplitsEvent());
}
}
}
@Override
public List<MockSourceSplit> snapshotState() {
return splits;
}
@Override
public void close() throws IOException {
}
}
......@@ -90,11 +90,11 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
int[] record = split.getNext(blockingFetch);
if (record != null) {
records.add(entry.getKey(), record);
if (split.isFinished()) {
records.addFinishedSplit(entry.getKey());
}
}
}
if (split.isFinished()) {
records.addFinishedSplit(entry.getKey());
}
}
} catch (InterruptedException ie) {
// Catch the exception and return the records that are already read.
......
......@@ -20,7 +20,6 @@ package org.apache.flink.api.connector.source;
import org.apache.flink.annotation.Public;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.CompletableFuture;
......@@ -32,7 +31,7 @@ import java.util.concurrent.CompletableFuture;
* @param <SplitT> The type of the the source splits.
*/
@Public
public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {
public interface SourceReader<T, SplitT extends SourceSplit> extends AutoCloseable {
/**
* Start the reader.
......
......@@ -49,7 +49,7 @@ class StreamExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection',
'socketTextStream', 'initializeContextEnvironment', 'readTextFile', 'addSource',
'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener',
'clearJobListeners', 'getJobListeners'}
'clearJobListeners', 'getJobListeners', "continuousSource"}
if __name__ == '__main__':
......
......@@ -20,8 +20,11 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.operators.util.OperatorValidationUtils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
/**
......@@ -37,7 +40,7 @@ public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {
public DataStreamSource(StreamExecutionEnvironment environment,
TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
boolean isParallel, String sourceName) {
super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));
super(environment, new LegacySourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));
this.isParallel = isParallel;
if (!isParallel) {
......@@ -50,6 +53,19 @@ public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {
this.isParallel = true;
}
public DataStreamSource(
StreamExecutionEnvironment environment,
Source<T, ?, ?> source,
TypeInformation<T> outTypeInfo,
String sourceName) {
super(environment,
new SourceTransformation<>(
sourceName,
new SourceOperatorFactory<>(source),
outTypeInfo,
environment.getParallelism()));
}
@Override
public DataStreamSource<T> setParallelism(int parallelism) {
OperatorValidationUtils.validateParallelism(parallelism, isParallel);
......
......@@ -32,6 +32,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.Utils;
......@@ -1591,25 +1592,52 @@ public class StreamExecutionEnvironment {
@SuppressWarnings("unchecked")
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
if (function instanceof ResultTypeQueryable) {
typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
}
if (typeInfo == null) {
try {
typeInfo = TypeExtractor.createTypeInfo(
SourceFunction.class,
function.getClass(), 0, null, null);
} catch (final InvalidTypesException e) {
typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
}
}
TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
boolean isParallel = function instanceof ParallelSourceFunction;
clean(function);
final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName);
}
/**
* Add a data {@link Source} to the environment to get a {@link DataStream}.
*
* @param source
* the user defined source
* @param sourceName
* Name of the data source
* @param <OUT>
* type of the returned stream
* @return the data stream constructed
*/
@PublicEvolving
public <OUT> DataStreamSource<OUT> continuousSource(Source<OUT, ?, ?> source, String sourceName) {
return continuousSource(source, sourceName, null);
}
/**
* Add a data {@link Source} to the environment to get a {@link DataStream}.
*
* @param source
* the user defined source
* @param sourceName
* Name of the data source
* @param <OUT>
* type of the returned stream
* @param typeInfo
* the user defined type information for the stream
* @return the data stream constructed
*/
@PublicEvolving
public <OUT> DataStreamSource<OUT> continuousSource(
Source<OUT, ?, ?> source,
String sourceName,
TypeInformation<OUT> typeInfo) {
TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(source, sourceName, Source.class, typeInfo);
return new DataStreamSource<>(this, source, resolvedTypeInfo, sourceName);
}
/**
......@@ -2106,4 +2134,27 @@ public class StreamExecutionEnvironment {
public void registerCachedFile(String filePath, String name, boolean executable) {
this.cacheFile.add(new Tuple2<>(name, new DistributedCache.DistributedCacheEntry(filePath, executable)));
}
// Private helpers.
@SuppressWarnings("unchecked")
private <OUT, T extends TypeInformation<OUT>> T getTypeInfo(
Object source,
String sourceName,
Class<?> baseSourceClass,
TypeInformation<OUT> typeInfo) {
TypeInformation<OUT> resolvedTypeInfo = typeInfo;
if (source instanceof ResultTypeQueryable) {
resolvedTypeInfo = ((ResultTypeQueryable<OUT>) source).getProducedType();
}
if (resolvedTypeInfo == null) {
try {
resolvedTypeInfo = TypeExtractor.createTypeInfo(
baseSourceClass,
source.getClass(), 0, null, null);
} catch (final InvalidTypesException e) {
resolvedTypeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
}
}
return (T) resolvedTypeInfo;
}
}
......@@ -40,6 +40,7 @@ import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
......@@ -47,6 +48,7 @@ import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.MultipleInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
......@@ -234,24 +236,46 @@ public class StreamGraph implements Pipeline {
return !vertexIDtoLoopTimeout.isEmpty();
}
public <IN, OUT> void addSource(Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
public <IN, OUT> void addSource(
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
SourceOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
addOperator(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
inTypeInfo,
outTypeInfo,
operatorName,
SourceOperatorStreamTask.class);
sources.add(vertexID);
}
public <IN, OUT> void addLegacySource(
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, outTypeInfo, operatorName);
sources.add(vertexID);
}
public <IN, OUT> void addSink(Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
public <IN, OUT> void addSink(
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo, outTypeInfo, operatorName);
sinks.add(vertexID);
}
......@@ -264,13 +288,23 @@ public class StreamGraph implements Pipeline {
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
Class<? extends AbstractInvokable> invokableClass =
operatorFactory.isStreamSource() ? SourceStreamTask.class : OneInputStreamTask.class;
addOperator(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, inTypeInfo,
outTypeInfo, operatorName, invokableClass);
}
if (operatorFactory.isStreamSource()) {
addNode(vertexID, slotSharingGroup, coLocationGroup, SourceStreamTask.class, operatorFactory, operatorName);
} else {
addNode(vertexID, slotSharingGroup, coLocationGroup, OneInputStreamTask.class, operatorFactory, operatorName);
}
private <IN, OUT> void addOperator(
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<OUT> operatorFactory,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName,
Class<? extends AbstractInvokable> invokableClass) {
addNode(vertexID, slotSharingGroup, coLocationGroup, invokableClass, operatorFactory, operatorName);
setSerializers(vertexID, createSerializer(inTypeInfo), null, createSerializer(outTypeInfo));
if (operatorFactory.isOutputTypeConfigurable() && outTypeInfo != null) {
......@@ -340,25 +374,26 @@ public class StreamGraph implements Pipeline {
}
}
protected StreamNode addNode(Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
Class<? extends AbstractInvokable> vertexClass,
StreamOperatorFactory<?> operatorFactory,
String operatorName) {
protected StreamNode addNode(
Integer vertexID,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
Class<? extends AbstractInvokable> vertexClass,
StreamOperatorFactory<?> operatorFactory,
String operatorName) {
if (streamNodes.containsKey(vertexID)) {
throw new RuntimeException("Duplicate vertexID " + vertexID);
}
StreamNode vertex = new StreamNode(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
operatorName,
new ArrayList<OutputSelector<?>>(),
vertexClass);
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
operatorName,
new ArrayList<OutputSelector<?>>(),
vertexClass);
streamNodes.put(vertexID, vertex);
......
......@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.transformations.AbstractMultipleInputTrans
import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
......@@ -254,8 +255,10 @@ public class StreamGraphGenerator {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof AbstractMultipleInputTransformation<?>) {
transformedIds = transformMultipleInputTransform((AbstractMultipleInputTransformation<?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
} else if (transform instanceof SourceTransformation) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof LegacySourceTransformation<?>) {
transformedIds = transformLegacySource((LegacySourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
......@@ -584,6 +587,26 @@ public class StreamGraphGenerator {
null,
source.getOutputType(),
"Source: " + source.getName());
int parallelism = source.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
source.getParallelism() : executionConfig.getParallelism();
streamGraph.setParallelism(source.getId(), parallelism);
streamGraph.setMaxParallelism(source.getId(), source.getMaxParallelism());
return Collections.singleton(source.getId());
}
/**
* Transforms a {@code LegacySourceTransformation}.
*/
private <T> Collection<Integer> transformLegacySource(LegacySourceTransformation<T> source) {
String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), Collections.emptyList());
streamGraph.addLegacySource(source.getId(),
slotSharingGroup,
source.getCoLocationGroupKey(),
source.getOperatorFactory(),
null,
source.getOutputType(),
"Source: " + source.getName());
if (source.getOperatorFactory() instanceof InputFormatOperatorFactory) {
streamGraph.setInputFormat(source.getId(),
((InputFormatOperatorFactory<T>) source.getOperatorFactory()).getInputFormat());
......@@ -596,7 +619,7 @@ public class StreamGraphGenerator {
}
/**
* Transforms a {@code SourceTransformation}.
* Transforms a {@code SinkTransformation}.
*/
private <T> Collection<Integer> transformSink(SinkTransformation<T> sink) {
......
......@@ -25,8 +25,11 @@ import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
......@@ -36,6 +39,7 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import static org.apache.flink.util.Preconditions.checkArgument;
......@@ -94,14 +98,13 @@ public class StreamNode implements Serializable {
}
public StreamNode(
Integer id,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<?> operatorFactory,
String operatorName,
List<OutputSelector<?>> outputSelector,
Class<? extends AbstractInvokable> jobVertexClass) {
Integer id,
@Nullable String slotSharingGroup,
@Nullable String coLocationGroup,
StreamOperatorFactory<?> operatorFactory,
String operatorName,
List<OutputSelector<?>> outputSelector,
Class<? extends AbstractInvokable> jobVertexClass) {
this.id = id;
this.operatorName = operatorName;
this.operatorFactory = operatorFactory;
......@@ -336,6 +339,17 @@ public class StreamNode implements Serializable {
this.userHash = userHash;
}
public Optional<OperatorCoordinator.Provider> getCoordinatorProvider(
String operatorName,
OperatorID operatorID) {
if (operatorFactory instanceof CoordinatedOperatorFactory) {
return Optional.of(((CoordinatedOperatorFactory) operatorFactory)
.getCoordinatorProvider(operatorName, operatorID));
} else {
return Optional.empty();
}
}
@Override
public boolean equals(Object o) {
if (this == o) {
......
......@@ -45,6 +45,7 @@ import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalPipelinedRegion;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
......@@ -165,9 +166,7 @@ public class StreamingJobGraphGenerator {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
setChaining(hashes, legacyHashes, chainedOperatorHashes);
setChaining(hashes, legacyHashes);
setPhysicalEdges();
......@@ -251,20 +250,17 @@ public class StreamingJobGraphGenerator {
*
* <p>This will recursively create all {@link JobVertex} instances.
*/
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
createChain(
sourceNodeId,
0,
new OperatorChainInfo(sourceNodeId, hashes, legacyHashes, streamGraph));
}
}
private List<StreamEdge> createChain(
Integer startNodeId,
Integer currentNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
int chainIndex,
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
private List<StreamEdge> createChain(Integer currentNodeId, int chainIndex, OperatorChainInfo chainInfo) {
Integer startNodeId = chainInfo.getStartNodeId();
if (!builtVertices.contains(startNodeId)) {
List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
......@@ -284,28 +280,20 @@ public class StreamingJobGraphGenerator {
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(
createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
createChain(chainable.getTargetId(), chainIndex + 1, chainInfo));
}
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
}
List<Tuple2<byte[], byte[]>> operatorHashes =
chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
byte[] primaryHashBytes = hashes.get(currentNodeId);
OperatorID currentOperatorId = new OperatorID(primaryHashBytes);
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
createChain(nonChainable.getTargetId(), 0, chainInfo.newChain(nonChainable.getTargetId()));
}
chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
OperatorID currentOperatorId = chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId));
if (currentNode.getInputFormat() != null) {
getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId, currentNode.getInputFormat());
}
......@@ -315,7 +303,7 @@ public class StreamingJobGraphGenerator {
}
StreamConfig config = currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
? createJobVertex(startNodeId, chainInfo)
: new StreamConfig(new Configuration());
setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
......@@ -393,14 +381,12 @@ public class StreamingJobGraphGenerator {
private StreamConfig createJobVertex(
Integer streamNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
OperatorChainInfo chainInfo) {
JobVertex jobVertex;
StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
byte[] hash = hashes.get(streamNodeId);
byte[] hash = chainInfo.getHash(streamNodeId);
if (hash == null) {
throw new IllegalStateException("Cannot find node hash. " +
......@@ -409,7 +395,7 @@ public class StreamingJobGraphGenerator {
JobVertexID jobVertexId = new JobVertexID(hash);
List<Tuple2<byte[], byte[]>> chainedOperators = chainedOperatorHashes.get(streamNodeId);
List<Tuple2<byte[], byte[]>> chainedOperators = chainInfo.getChainedOperatorHashes(streamNodeId);
List<OperatorIDPair> operatorIDPairs = new ArrayList<>();
if (chainedOperators != null) {
for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) {
......@@ -434,6 +420,15 @@ public class StreamingJobGraphGenerator {
operatorIDPairs);
}
for (OperatorCoordinator.Provider coordinatorProvider : chainInfo.getCoordinatorProviders()) {
try {
jobVertex.addOperatorCoordinator(new SerializedValue<>(coordinatorProvider));
} catch (IOException e) {
throw new FlinkRuntimeException(String.format(
"Coordinator Provider for node %s is not serializable.", chainedNames.get(streamNodeId)));
}
}
jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId));
jobVertex.setInvokableClass(streamNode.getJobVertexClass());
......@@ -973,4 +968,67 @@ public class StreamingJobGraphGenerator {
jobGraph.setSnapshotSettings(settings);
}
/**
* A private class to help maintain the information of an operator chain during the recursive call in
* {@link #createChain(Integer, int, OperatorChainInfo)}.
*/
private static class OperatorChainInfo {
private final Integer startNodeId;
private final Map<Integer, byte[]> hashes;
private final List<Map<Integer, byte[]>> legacyHashes;
private final Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes;
private final List<OperatorCoordinator.Provider> coordinatorProviders;
private final StreamGraph streamGraph;
private OperatorChainInfo(
int startNodeId,
Map<Integer, byte[]> hashes,
List<Map<Integer, byte[]>> legacyHashes,
StreamGraph streamGraph) {
this.startNodeId = startNodeId;
this.hashes = hashes;
this.legacyHashes = legacyHashes;
this.chainedOperatorHashes = new HashMap<>();
this.coordinatorProviders = new ArrayList<>();
this.streamGraph = streamGraph;
}
byte[] getHash(Integer streamNodeId) {
return hashes.get(streamNodeId);
}
private Integer getStartNodeId() {
return startNodeId;
}
private List<Tuple2<byte[], byte[]>> getChainedOperatorHashes(int startNodeId) {
return chainedOperatorHashes.get(startNodeId);
}
private List<OperatorCoordinator.Provider> getCoordinatorProviders() {
return coordinatorProviders;
}
private OperatorID addNodeToChain(int currentNodeId, String operatorName) {
List<Tuple2<byte[], byte[]>> operatorHashes =
chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());
byte[] primaryHashBytes = hashes.get(currentNodeId);
for (Map<Integer, byte[]> legacyHash : legacyHashes) {
operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
}
streamGraph
.getStreamNode(currentNodeId)
.getCoordinatorProvider(operatorName, new OperatorID(getHash(currentNodeId)))
.map(coordinatorProviders::add);
return new OperatorID(primaryHashBytes);
}
private OperatorChainInfo newChain(Integer startNodeId) {
return new OperatorChainInfo(startNodeId, hashes, legacyHashes, streamGraph);
}
}
}
......@@ -64,6 +64,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
public class SourceOperator<OUT, SplitT extends SourceSplit>
extends AbstractStreamOperator<OUT>
implements OperatorEventHandler, PushingAsyncDataInput<OUT> {
private static final long serialVersionUID = 1405537676017904695L;
// Package private for unit test.
static final ListStateDescriptor<byte[]> SPLITS_STATE_DESC =
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.transformations;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamSource;
import java.util.Collection;
import java.util.Collections;
/**
* This represents a Source. This does not actually transform anything since it has no inputs but
* it is the root {@code Transformation} of any topology.
*
* @param <T> The type of the elements that this source produces
*/
@Internal
public class LegacySourceTransformation<T> extends PhysicalTransformation<T> {
private final StreamOperatorFactory<T> operatorFactory;
/**
* Creates a new {@code LegacySourceTransformation} from the given operator.
*
* @param name The name of the {@code LegacySourceTransformation}, this will be shown in Visualizations and the Log
* @param operator The {@code StreamSource} that is the operator of this Transformation
* @param outputType The type of the elements produced by this {@code LegacySourceTransformation}
* @param parallelism The parallelism of this {@code LegacySourceTransformation}
*/
public LegacySourceTransformation(
String name,
StreamSource<T, ?> operator,
TypeInformation<T> outputType,
int parallelism) {
this(name, SimpleOperatorFactory.of(operator), outputType, parallelism);
}
public LegacySourceTransformation(
String name,
StreamOperatorFactory<T> operatorFactory,
TypeInformation<T> outputType,
int parallelism) {
super(name, outputType, parallelism);
this.operatorFactory = operatorFactory;
}
@VisibleForTesting
public StreamSource<T, ?> getOperator() {
return (StreamSource<T, ?>) ((SimpleOperatorFactory) operatorFactory).getOperator();
}
/**
* Returns the {@code StreamOperatorFactory} of this {@code LegacySourceTransformation}.
*/
public StreamOperatorFactory<T> getOperatorFactory() {
return operatorFactory;
}
@Override
public Collection<Transformation<?>> getTransitivePredecessors() {
return Collections.singleton(this);
}
@Override
public final void setChainingStrategy(ChainingStrategy strategy) {
operatorFactory.setChainingStrategy(strategy);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.apache.flink.streaming.api.transformations;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import java.util.Collection;
import java.util.Collections;
/**
* This represents a Source. This does not actually transform anything since it has no inputs but
* it is the root {@code Transformation} of any topology.
*
* @param <T> The type of the elements that this source produces
* A {@link PhysicalTransformation} for {@link Source}.
*/
@Internal
public class SourceTransformation<T> extends PhysicalTransformation<T> {
private final StreamOperatorFactory<T> operatorFactory;
public class SourceTransformation<OUT> extends PhysicalTransformation<OUT> {
private final SourceOperatorFactory<OUT> sourceFactory;
/**
* Creates a new {@code SourceTransformation} from the given operator.
* Creates a new {@code Transformation} with the given name, output type and parallelism.
*
* @param name The name of the {@code SourceTransformation}, this will be shown in Visualizations and the Log
* @param operator The {@code StreamSource} that is the operator of this Transformation
* @param outputType The type of the elements produced by this {@code SourceTransformation}
* @param parallelism The parallelism of this {@code SourceTransformation}
* @param name The name of the {@code Transformation}, this will be shown in Visualizations and the Log
* @param sourceFactory The operator factory for {@link SourceOperator}.
* @param outputType The output type of this {@code Transformation}
* @param parallelism The parallelism of this {@code Transformation}
*/
public SourceTransformation(
String name,
StreamSource<T, ?> operator,
TypeInformation<T> outputType,
int parallelism) {
this(name, SimpleOperatorFactory.of(operator), outputType, parallelism);
}
public SourceTransformation(
String name,
StreamOperatorFactory<T> operatorFactory,
TypeInformation<T> outputType,
SourceOperatorFactory<OUT> sourceFactory,
TypeInformation<OUT> outputType,
int parallelism) {
super(name, outputType, parallelism);
this.operatorFactory = operatorFactory;
}
@VisibleForTesting
public StreamSource<T, ?> getOperator() {
return (StreamSource<T, ?>) ((SimpleOperatorFactory) operatorFactory).getOperator();
this.sourceFactory = sourceFactory;
}
/**
* Returns the {@code StreamOperatorFactory} of this {@code SourceTransformation}.
* Returns the {@code StreamOperatorFactory} of this {@code LegacySourceTransformation}.
*/
public StreamOperatorFactory<T> getOperatorFactory() {
return operatorFactory;
public SourceOperatorFactory<OUT> getOperatorFactory() {
return sourceFactory;
}
@Override
......@@ -85,6 +66,6 @@ public class SourceTransformation<T> extends PhysicalTransformation<T> {
@Override
public final void setChainingStrategy(ChainingStrategy strategy) {
operatorFactory.setChainingStrategy(strategy);
sourceFactory.setChainingStrategy(strategy);
}
}
......@@ -28,6 +28,9 @@ import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.io.TypeSerializerInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -43,6 +46,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
......@@ -56,18 +60,27 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
......@@ -234,6 +247,26 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
assertTrue(printConfig.isChainEnd());
}
@Test
public void testOperatorCoordinatorAddedToJobVertex() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> stream =
env.continuousSource(new MockSource(Boundedness.BOUNDED, 1), "TestingSource");
OneInputTransformation<Integer, Integer> resultTransform = new OneInputTransformation<Integer, Integer>(
stream.getTransformation(),
"AnyName",
new CoordinatedTransformOperatorFactory(),
BasicTypeInfo.INT_TYPE_INFO,
env.getParallelism());
new TestingSingleOutputStreamOperator<>(env, resultTransform).print();
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
assertEquals(2, jobGraph.getVerticesAsArray()[0].getOperatorCoordinators().size());
}
/**
* Verifies that the resources are merged correctly for chained operators (covers source and sink cases)
* when generating job graph.
......@@ -422,6 +455,30 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
assertTrue(sinkFormat2 instanceof DiscardingOutputFormat);
}
@Test
public void testCoordinatedOperator() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> source =
env.continuousSource(new MockSource(Boundedness.BOUNDED, 1), "TestSource");
source.addSink(new DiscardingSink<>());
StreamGraph streamGraph = env.getStreamGraph();
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
// There should be only one job vertex.
assertEquals(1, jobGraph.getNumberOfVertices());
JobVertex jobVertex = jobGraph.getVerticesAsArray()[0];
List<SerializedValue<OperatorCoordinator.Provider>> coordinatorProviders = jobVertex.getOperatorCoordinators();
// There should be only one coordinator provider.
assertEquals(1, coordinatorProviders.size());
// The invokable class should be SourceOperatorStreamTask.
final ClassLoader classLoader = getClass().getClassLoader();
assertEquals(SourceOperatorStreamTask.class, jobVertex.getInvokableClass(classLoader));
StreamOperatorFactory operatorFactory =
new StreamConfig(jobVertex.getConfiguration()).getStreamOperatorFactory(classLoader);
assertTrue(operatorFactory instanceof SourceOperatorFactory);
}
/**
* Test setting shuffle mode to {@link ShuffleMode#PIPELINED}.
*/
......@@ -890,4 +947,42 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
}
}
// ------------ private classes -------------
private static class CoordinatedTransformOperatorFactory
extends AbstractStreamOperatorFactory<Integer>
implements CoordinatedOperatorFactory<Integer>, OneInputStreamOperatorFactory<Integer, Integer> {
@Override
public OperatorCoordinator.Provider getCoordinatorProvider(String operatorName, OperatorID operatorID) {
return new OperatorCoordinator.Provider() {
@Override
public OperatorID getOperatorId() {
return null;
}
@Override
public OperatorCoordinator create(OperatorCoordinator.Context context) {
return null;
}
};
}
@Override
public <T extends StreamOperator<Integer>> T createStreamOperator(StreamOperatorParameters<Integer> parameters) {
return null;
}
@Override
public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
return null;
}
}
private static class TestingSingleOutputStreamOperator<OUT> extends SingleOutputStreamOperator<OUT> {
public TestingSingleOutputStreamOperator(StreamExecutionEnvironment environment,
Transformation<OUT> transformation) {
super(environment, transformation);
}
}
}
......@@ -23,6 +23,7 @@ import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat}
import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.connector.source.{Source, SourceSplit}
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
......@@ -660,6 +661,15 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
addSource(sourceFunction)
}
/**
* Create a DataStream using a [[Source]].
*/
def continuousSource[T: TypeInformation](
source: Source[T, _ <: SourceSplit, _],
sourceName: String): Unit = {
asScalaStream(javaEnv.continuousSource(source, sourceName))
}
/**
* Triggers the program execution. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
......
......@@ -79,7 +79,7 @@ class StreamExecTableSourceScan(
name: String,
outTypeInfo: RowDataTypeInfo): Transformation[RowData] = {
// It's better to use StreamExecutionEnvironment.createInput()
// rather than addSource() for streaming, because it take care of checkpoint.
// rather than addLegacySource() for streaming, because it take care of checkpoint.
env
.createInput(inputFormat, outTypeInfo)
.name(name)
......
......@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.util.TestLogger;
......@@ -47,7 +47,7 @@ public class BatchExecutorTest extends TestLogger {
public BatchExecutorTest() {
batchExecutor = new BatchExecutor(LocalStreamEnvironment.getExecutionEnvironment());
final Transformation testTransform = new SourceTransformation<>(
final Transformation testTransform = new LegacySourceTransformation<>(
"MockTransform",
new StreamSource<>(new SourceFunction<String>() {
@Override
......
......@@ -366,7 +366,7 @@ class HarnessTestBase extends StreamingWithStateTestBase {
}
case union: UnionTransformation[_] => extractFromInputs(union.getInputs.toSeq: _*)
case p: PartitionTransformation[_] => extractFromInputs(p.getInput)
case _: SourceTransformation[_] => null
case _: LegacySourceTransformation[_] => null
case _ => throw new UnsupportedOperationException("This should not happen.")
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册