提交 c40cfd31 编写于 作者: A Aljoscha Krettek 提交者: Till Rohrmann

[FLINK-3315] Fix Slot Sharing in Streaming API

This changes slot sharing settings to single method
slotSharingGroup(String) on DataStream.

Operations inherit the slot sharing group of the input if all input
operations are in the same slot sharing group.

The default slot sharing group is "default" this can also be explicitly
set using slotSharingGroup("default"). This overrides the inheriting
behaviour.

This closes #1641.
上级 dc4d147d
......@@ -103,4 +103,23 @@ public class DataStreamSink<T> {
this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
return this;
}
/**
* Sets the slot sharing group of this operation. Parallel instances of
* operations that are in the same slot sharing group will be co-located in the same
* TaskManager slot, if possible.
*
* <p>Operations inherit the slot sharing group of input operations if all input operations
* are in the same slot sharing group and no slot sharing group was explicitly specified.
*
* <p>Initially an operation is in the default slot sharing group. An operation can be put into
* the default group explicitly by setting the slot sharing group to {@code "default"}.
*
* @param slotSharingGroup The slot sharing group name.
*/
@PublicEvolving
public DataStreamSink<T> slotSharingGroup(String slotSharingGroup) {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
}
......@@ -24,7 +24,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
......@@ -313,40 +312,21 @@ public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<
}
/**
* By default all operators in a streaming job share the same resource
* group. Each resource group takes as many task manager slots as the
* maximum parallelism operator in that group. Task chaining is only
* possible within one resource group. By calling this method, this
* operators starts a new resource group and all subsequent operators will
* be added to this group unless specified otherwise.
* <p> Please note that
* local executions have by default as many available task slots as the
* environment parallelism, so in order to start a new resource group the
* degree of parallelism for the operators must be decreased from the
* default.
*
* @return The operator as a part of a new resource group.
*/
@PublicEvolving
public SingleOutputStreamOperator<T, O> startNewResourceGroup() {
transformation.setResourceStrategy(ResourceStrategy.NEWGROUP);
return this;
}
/**
* Isolates the operator in its own resource group. This will cause the
* operator to grab as many task slots as its degree of parallelism. If
* there are no free resources available, the job will fail to start. It
* also disables chaining for this operator.
* <p>All subsequent operators are
* assigned to the default resource group.
*
* @return The operator with isolated resource group.
* Sets the slot sharing group of this operation. Parallel instances of
* operations that are in the same slot sharing group will be co-located in the same
* TaskManager slot, if possible.
*
* <p>Operations inherit the slot sharing group of input operations if all input operations
* are in the same slot sharing group and no slot sharing group was explicitly specified.
*
* <p>Initially an operation is in the default slot sharing group. An operation can be put into
* the default group explicitly by setting the slot sharing group to {@code "default"}.
*
* @param slotSharingGroup The slot sharing group name.
*/
@PublicEvolving
public SingleOutputStreamOperator<T, O> isolateResources() {
transformation.setResourceStrategy(ResourceStrategy.ISOLATE);
public SingleOutputStreamOperator<T, O> slotSharingGroup(String slotSharingGroup) {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
}
......@@ -154,31 +154,40 @@ public class StreamGraph extends StreamingPlan {
return!vertexIDtoLoopTimeout.isEmpty();
}
public <IN, OUT> void addSource(Integer vertexID, StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
addOperator(vertexID, operatorObject, inTypeInfo, outTypeInfo, operatorName);
public <IN, OUT> void addSource(Integer vertexID,
String slotSharingGroup,
StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
addOperator(vertexID, slotSharingGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);
sources.add(vertexID);
}
public <IN, OUT> void addSink(Integer vertexID, StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
addOperator(vertexID, operatorObject, inTypeInfo, outTypeInfo, operatorName);
public <IN, OUT> void addSink(Integer vertexID,
String slotSharingGroup,
StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
addOperator(vertexID, slotSharingGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);
sinks.add(vertexID);
}
public <IN, OUT> void addOperator(
Integer vertexID,
String slotSharingGroup,
StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
if (operatorObject instanceof StoppableStreamSource) {
addNode(vertexID, StoppableSourceStreamTask.class, operatorObject, operatorName);
addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
} else if (operatorObject instanceof StreamSource) {
addNode(vertexID, SourceStreamTask.class, operatorObject, operatorName);
addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName);
} else {
addNode(vertexID, OneInputStreamTask.class, operatorObject, operatorName);
addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName);
}
TypeSerializer<IN> inSerializer = inTypeInfo != null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null;
......@@ -206,13 +215,14 @@ public class StreamGraph extends StreamingPlan {
public <IN1, IN2, OUT> void addCoOperator(
Integer vertexID,
String slotSharingGroup,
TwoInputStreamOperator<IN1, IN2, OUT> taskOperatorObject,
TypeInformation<IN1> in1TypeInfo,
TypeInformation<IN2> in2TypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
addNode(vertexID, TwoInputStreamTask.class, taskOperatorObject, operatorName);
addNode(vertexID, slotSharingGroup, TwoInputStreamTask.class, taskOperatorObject, operatorName);
TypeSerializer<OUT> outSerializer = (outTypeInfo != null) && !(outTypeInfo instanceof MissingTypeInfo) ?
outTypeInfo.createSerializer(executionConfig) : null;
......@@ -231,15 +241,23 @@ public class StreamGraph extends StreamingPlan {
}
}
protected StreamNode addNode(Integer vertexID, Class<? extends AbstractInvokable> vertexClass,
StreamOperator<?> operatorObject, String operatorName) {
protected StreamNode addNode(Integer vertexID,
String slotSharingGroup,
Class<? extends AbstractInvokable> vertexClass,
StreamOperator<?> operatorObject,
String operatorName) {
if (streamNodes.containsKey(vertexID)) {
throw new RuntimeException("Duplicate vertexID " + vertexID);
}
StreamNode vertex = new StreamNode(environemnt, vertexID, operatorObject, operatorName,
new ArrayList<OutputSelector<?>>(), vertexClass);
StreamNode vertex = new StreamNode(environemnt,
vertexID,
slotSharingGroup,
operatorObject,
operatorName,
new ArrayList<OutputSelector<?>>(),
vertexClass);
streamNodes.put(vertexID, vertex);
......@@ -288,6 +306,22 @@ public class StreamGraph extends StreamingPlan {
new Tuple2<Integer, StreamPartitioner<?>>(originalId, partitioner));
}
/**
* Determines the slot sharing group of an operation across virtual nodes.
*/
public String getSlotSharingGroup(Integer id) {
if (virtualSelectNodes.containsKey(id)) {
Integer mappedId = virtualSelectNodes.get(id).f0;
return getSlotSharingGroup(mappedId);
} else if (virtuaPartitionNodes.containsKey(id)) {
Integer mappedId = virtuaPartitionNodes.get(id).f0;
return getSlotSharingGroup(mappedId);
} else {
StreamNode node = getStreamNode(id);
return node.getSlotSharingGroup();
}
}
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
addEdgeInternal(upStreamVertexID,
downStreamVertexID,
......@@ -414,24 +448,6 @@ public class StreamGraph extends StreamingPlan {
getStreamNode(vertexID).setInputFormat(inputFormat);
}
public void setResourceStrategy(Integer vertexID, ResourceStrategy strategy) {
StreamNode node = getStreamNode(vertexID);
if (node == null) {
return;
}
switch (strategy) {
case ISOLATE:
node.isolateSlot();
break;
case NEWGROUP:
node.startNewSlotSharingGroup();
break;
default:
throw new IllegalArgumentException("Unknown resource strategy");
}
}
void setTransformationId(Integer nodeId, String transformationId) {
StreamNode node = streamNodes.get(nodeId);
if (node != null) {
......@@ -495,23 +511,23 @@ public class StreamGraph extends StreamingPlan {
public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int loopId, int sourceId, int sinkId, long timeout, int parallelism) {
StreamNode source = this.addNode(sourceId,
StreamIterationHead.class,
null,
null);
null,
StreamIterationHead.class,
null,
"IterationSource-" + loopId);
sources.add(source.getId());
setParallelism(source.getId(), parallelism);
StreamNode sink = this.addNode(sinkId,
StreamIterationTail.class,
null,
null);
null,
StreamIterationTail.class,
null,
"IterationSink-" + loopId);
sinks.add(sink.getId());
setParallelism(sink.getId(), parallelism);
iterationSourceSinkPairs.add(new Tuple2<>(source, sink));
source.setOperatorName("IterationSource-" + loopId);
sink.setOperatorName("IterationSink-" + loopId);
this.vertexIDtoBrokerID.put(source.getId(), "broker-" + loopId);
this.vertexIDtoBrokerID.put(sink.getId(), "broker-" + loopId);
this.vertexIDtoLoopTimeout.put(source.getId(), timeout);
......
......@@ -181,9 +181,6 @@ public class StreamGraphGenerator {
if (transform.getBufferTimeout() > 0) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
}
if (transform.getResourceStrategy() != StreamGraph.ResourceStrategy.DEFAULT) {
streamGraph.setResourceStrategy(transform.getId(), transform.getResourceStrategy());
}
if (transform.getUid() != null) {
streamGraph.setTransformationId(transform.getId(), transform.getUid());
}
......@@ -302,14 +299,14 @@ public class StreamGraphGenerator {
List<Integer> resultIds = new ArrayList<>();
// first transform the input stream(s) and store the result IDs
resultIds.addAll(transform(input));
Collection<Integer> inputIds = transform(input);
resultIds.addAll(inputIds);
// the recursive transform might have already transformed this
if (alreadyTransformed.containsKey(iterate)) {
return alreadyTransformed.get(iterate);
}
// create the fake iteration source/sink pair
Tuple2<StreamNode, StreamNode> itSourceAndSink = streamGraph.createIterationSourceAndSink(
iterate.getId(),
......@@ -333,8 +330,12 @@ public class StreamGraphGenerator {
// the feedback edges and let them stop when encountering the iterate node
alreadyTransformed.put(iterate, resultIds);
// so that we can determine the slot sharing group from all feedback edges
List<Integer> allFeedbackIds = new ArrayList<>();
for (StreamTransformation<T> feedbackEdge : iterate.getFeedbackEdges()) {
Collection<Integer> feedbackIds = transform(feedbackEdge);
allFeedbackIds.addAll(feedbackIds);
for (Integer feedbackId: feedbackIds) {
streamGraph.addEdge(feedbackId,
itSink.getId(),
......@@ -343,6 +344,11 @@ public class StreamGraphGenerator {
}
}
String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
itSink.setSlotSharingGroup(slotSharingGroup);
itSource.setSlotSharingGroup(slotSharingGroup);
return resultIds;
}
......@@ -386,8 +392,12 @@ public class StreamGraphGenerator {
// the feedback edges and let them stop when encountering the iterate node
alreadyTransformed.put(coIterate, resultIds);
// so that we can determine the slot sharing group from all feedback edges
List<Integer> allFeedbackIds = new ArrayList<>();
for (StreamTransformation<F> feedbackEdge : coIterate.getFeedbackEdges()) {
Collection<Integer> feedbackIds = transform(feedbackEdge);
allFeedbackIds.addAll(feedbackIds);
for (Integer feedbackId: feedbackIds) {
streamGraph.addEdge(feedbackId,
itSink.getId(),
......@@ -396,6 +406,11 @@ public class StreamGraphGenerator {
}
}
String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
itSink.setSlotSharingGroup(slotSharingGroup);
itSource.setSlotSharingGroup(slotSharingGroup);
return Collections.singleton(itSource.getId());
}
......@@ -403,7 +418,9 @@ public class StreamGraphGenerator {
* Transforms a {@code SourceTransformation}.
*/
private <T> Collection<Integer> transformSource(SourceTransformation<T> source) {
String slotSharingGroup = determineSlotSharingGroup(source.getSlotSharingGroup(), new ArrayList<Integer>());
streamGraph.addSource(source.getId(),
slotSharingGroup,
source.getOperator(),
null,
source.getOutputType(),
......@@ -423,7 +440,10 @@ public class StreamGraphGenerator {
Collection<Integer> inputIds = transform(sink.getInput());
String slotSharingGroup = determineSlotSharingGroup(sink.getSlotSharingGroup(), inputIds);
streamGraph.addSink(sink.getId(),
slotSharingGroup,
sink.getOperator(),
sink.getInput().getOutputType(),
null,
......@@ -463,7 +483,10 @@ public class StreamGraphGenerator {
return alreadyTransformed.get(transform);
}
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
streamGraph.addOperator(transform.getId(),
slotSharingGroup,
transform.getOperator(),
transform.getInputType(),
transform.getOutputType(),
......@@ -500,8 +523,15 @@ public class StreamGraphGenerator {
return alreadyTransformed.get(transform);
}
List<Integer> allInputIds = new ArrayList<>();
allInputIds.addAll(inputIds1);
allInputIds.addAll(inputIds2);
String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), allInputIds);
streamGraph.addCoOperator(
transform.getId(),
slotSharingGroup,
transform.getOperator(),
transform.getInputType1(),
transform.getInputType2(),
......@@ -533,4 +563,31 @@ public class StreamGraphGenerator {
return Collections.singleton(transform.getId());
}
/**
* Determines the slot sharing group for an operation based on the slot sharing group set by
* the user and the slot sharing groups of the inputs.
*
* <p>If the user specifies a group name, this is taken as is. If nothing is specified and
* the input operations all have the same group name then this name is taken. Otherwise the
* default group is choosen.
*
* @param specifiedGroup The group specified by the user.
* @param inputIds The IDs of the input operations.
*/
private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {
if (specifiedGroup != null) {
return specifiedGroup;
} else {
String inputGroup = null;
for (int id: inputIds) {
String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
if (inputGroup == null) {
inputGroup = inputGroupCandidate;
} else if (!inputGroup.equals(inputGroupCandidate)) {
return "default";
}
}
return inputGroup == null ? "default" : inputGroup;
}
}
}
......@@ -37,16 +37,14 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
public class StreamNode implements Serializable {
private static final long serialVersionUID = 1L;
private static int currentSlotSharingIndex = 1;
transient private StreamExecutionEnvironment env;
private Integer id;
private final Integer id;
private Integer parallelism = null;
private Long bufferTimeout = null;
private String operatorName;
private Integer slotSharingID;
private boolean isolatedSlot = false;
private final String operatorName;
private String slotSharingGroup;
private KeySelector<?,?> statePartitioner1;
private KeySelector<?,?> statePartitioner2;
private TypeSerializer<?> stateKeySerializer;
......@@ -60,22 +58,26 @@ public class StreamNode implements Serializable {
private List<StreamEdge> inEdges = new ArrayList<StreamEdge>();
private List<StreamEdge> outEdges = new ArrayList<StreamEdge>();
private Class<? extends AbstractInvokable> jobVertexClass;
private final Class<? extends AbstractInvokable> jobVertexClass;
private InputFormat<?, ?> inputFormat;
private String transformationId;
public StreamNode(StreamExecutionEnvironment env, Integer id, StreamOperator<?> operator,
String operatorName, List<OutputSelector<?>> outputSelector,
Class<? extends AbstractInvokable> jobVertexClass) {
public StreamNode(StreamExecutionEnvironment env,
Integer id,
String slotSharingGroup,
StreamOperator<?> operator,
String operatorName,
List<OutputSelector<?>> outputSelector,
Class<? extends AbstractInvokable> jobVertexClass) {
this.env = env;
this.id = id;
this.operatorName = operatorName;
this.operator = operator;
this.outputSelectors = outputSelector;
this.jobVertexClass = jobVertexClass;
this.slotSharingID = currentSlotSharingIndex;
this.slotSharingGroup = slotSharingGroup;
}
public void addInEdge(StreamEdge inEdge) {
......@@ -158,10 +160,6 @@ public class StreamNode implements Serializable {
return operatorName;
}
public void setOperatorName(String operatorName) {
this.operatorName = operatorName;
}
public List<OutputSelector<?>> getOutputSelectors() {
return outputSelectors;
}
......@@ -206,18 +204,19 @@ public class StreamNode implements Serializable {
this.inputFormat = inputFormat;
}
public int getSlotSharingID() {
return isolatedSlot ? -1 : slotSharingID;
public void setSlotSharingGroup(String slotSharingGroup) {
this.slotSharingGroup = slotSharingGroup;
}
public void startNewSlotSharingGroup() {
this.slotSharingID = ++currentSlotSharingIndex;
public String getSlotSharingGroup() {
return slotSharingGroup;
}
public void isolateSlot() {
isolatedSlot = true;
public boolean isSameSlotSharingGroup(StreamNode downstreamVertex) {
return (slotSharingGroup == null && downstreamVertex.slotSharingGroup == null) ||
(slotSharingGroup != null && slotSharingGroup.equals(downstreamVertex.slotSharingGroup));
}
@Override
public String toString() {
return operatorName + "-" + id;
......
......@@ -100,12 +100,12 @@ public class StreamingJobGraphGenerator {
}
private void init() {
this.jobVertices = new HashMap<Integer, JobVertex>();
this.builtVertices = new HashSet<Integer>();
this.chainedConfigs = new HashMap<Integer, Map<Integer, StreamConfig>>();
this.vertexConfigs = new HashMap<Integer, StreamConfig>();
this.chainedNames = new HashMap<Integer, String>();
this.physicalEdgesInOrder = new ArrayList<StreamEdge>();
this.jobVertices = new HashMap<>();
this.builtVertices = new HashSet<>();
this.chainedConfigs = new HashMap<>();
this.vertexConfigs = new HashMap<>();
this.chainedNames = new HashMap<>();
this.physicalEdgesInOrder = new ArrayList<>();
}
public JobGraph createJobGraph(String jobName) {
......@@ -149,7 +149,7 @@ public class StreamingJobGraphGenerator {
// create if not set
if (inEdges == null) {
inEdges = new ArrayList<StreamEdge>();
inEdges = new ArrayList<>();
physicalInEdgesInOrder.put(target, inEdges);
}
......@@ -403,8 +403,7 @@ public class StreamingJobGraphGenerator {
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
&& headOperator != null
&& upStreamVertex.getSlotSharingID() == downStreamVertex.getSlotSharingID()
&& upStreamVertex.getSlotSharingID() != -1
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
......@@ -415,20 +414,18 @@ public class StreamingJobGraphGenerator {
private void setSlotSharing() {
Map<Integer, SlotSharingGroup> slotSharingGroups = new HashMap<>();
Map<String, SlotSharingGroup> slotSharingGroups = new HashMap<>();
for (Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
int slotSharingID = streamGraph.getStreamNode(entry.getKey()).getSlotSharingID();
String slotSharingGroup = streamGraph.getStreamNode(entry.getKey()).getSlotSharingGroup();
if (slotSharingID != -1) {
SlotSharingGroup group = slotSharingGroups.get(slotSharingID);
if (group == null) {
group = new SlotSharingGroup();
slotSharingGroups.put(slotSharingID, group);
}
entry.getValue().setSlotSharingGroup(group);
SlotSharingGroup group = slotSharingGroups.get(slotSharingGroup);
if (group == null) {
group = new SlotSharingGroup();
slotSharingGroups.put(slotSharingGroup, group);
}
entry.getValue().setSlotSharingGroup(group);
}
for (Tuple2<StreamNode, StreamNode> pair : streamGraph.getIterationSourceSinkPairs()) {
......@@ -704,7 +701,7 @@ public class StreamingJobGraphGenerator {
if (LOG.isDebugEnabled()) {
String udfClassName = "";
if (node.getOperator() instanceof AbstractUdfStreamOperator) {
udfClassName = ((AbstractUdfStreamOperator) node.getOperator())
udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
.getUserFunction().getClass().getName();
}
......@@ -737,7 +734,7 @@ public class StreamingJobGraphGenerator {
hasher.putString(node.getOperatorName(), Charset.forName("UTF-8"));
if (node.getOperator() instanceof AbstractUdfStreamOperator) {
String udfClassName = ((AbstractUdfStreamOperator) node.getOperator())
String udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator())
.getUserFunction().getClass().getName();
hasher.putString(udfClassName, Charset.forName("UTF-8"));
......
......@@ -27,7 +27,7 @@ import java.util.Collections;
import java.util.List;
/**
* This represents a feedback point in a topology. The type of the feedback elements must not match
* This represents a feedback point in a topology. The type of the feedback elements need not match
* the type of the upstream {@code StreamTransformation} because the only allowed operations
* after a {@code CoFeedbackTransformation} are
* {@link org.apache.flink.streaming.api.transformations.TwoInputTransformation TwoInputTransformations}.
......
......@@ -126,7 +126,7 @@ public abstract class StreamTransformation<T> {
protected long bufferTimeout = -1;
protected StreamGraph.ResourceStrategy resourceStrategy = StreamGraph.ResourceStrategy.DEFAULT;
private String slotSharingGroup;
/**
* Creates a new {@code StreamTransformation} with the given name, output type and parallelism.
......@@ -140,6 +140,7 @@ public abstract class StreamTransformation<T> {
this.name = Preconditions.checkNotNull(name);
this.outputType = outputType;
this.parallelism = parallelism;
this.slotSharingGroup = null;
}
/**
......@@ -203,6 +204,29 @@ public abstract class StreamTransformation<T> {
return uid;
}
/**
* Returns the slot sharing group of this transformation.
*
* @see #setSlotSharingGroup(String)
*/
public String getSlotSharingGroup() {
return slotSharingGroup;
}
/**
* Sets the slot sharing group of this transformation. Parallel instances of operations that
* are in the same slot sharing group will be co-located in the same TaskManager slot, if
* possible.
*
* <p>Initially, an operation is in the default slot sharing group. This can be explicitly
* set using {@code setSlotSharingGroup("default")}.
*
* @param slotSharingGroup The slot sharing group name.
*/
public void setSlotSharingGroup(String slotSharingGroup) {
this.slotSharingGroup = slotSharingGroup;
}
/**
* Tries to fill in the type information. Type information can be filled in
* later when the program uses a type hint. This method checks whether the
......@@ -273,25 +297,6 @@ public abstract class StreamTransformation<T> {
return bufferTimeout;
}
/**
* Sets the {@link org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy} of this
* {@code StreamTransformation}. The resource strategy is used when scheduling operations on actual
* workers when transforming the StreamTopology to an
* {@link org.apache.flink.runtime.executiongraph.ExecutionGraph}.
*/
public void setResourceStrategy(StreamGraph.ResourceStrategy resourceStrategy) {
this.resourceStrategy = resourceStrategy;
}
/**
* Returns the {@code ResourceStrategy} of this {@code StreamTransformation}.
*
* @see #setResourceStrategy(StreamGraph.ResourceStrategy)
*/
public StreamGraph.ResourceStrategy getResourceStrategy() {
return resourceStrategy;
}
/**
* Returns all transitive predecessor {@code StreamTransformation}s of this {@code StreamTransformation}. This
* is, for example, used when determining whether a feedback edge of an iteration
......@@ -334,10 +339,7 @@ public abstract class StreamTransformation<T> {
if (!name.equals(that.name)) {
return false;
}
if (outputType != null ? !outputType.equals(that.outputType) : that.outputType != null) {
return false;
}
return resourceStrategy == that.resourceStrategy;
return outputType != null ? outputType.equals(that.outputType) : that.outputType == null;
}
@Override
......@@ -347,7 +349,6 @@ public abstract class StreamTransformation<T> {
result = 31 * result + (outputType != null ? outputType.hashCode() : 0);
result = 31 * result + parallelism;
result = 31 * result + (int) (bufferTimeout ^ (bufferTimeout >>> 32));
result = 31 * result + resourceStrategy.hashCode();
return result;
}
}
......@@ -24,15 +24,23 @@ import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.junit.Test;
/**
* This verifies that slot sharing groups are correctly forwarded from user job to JobGraph.
*
* <p>These tests also implicitly verify that chaining does not work across
* resource groups/slot sharing groups.
*/
@SuppressWarnings("serial")
public class SlotAllocationTest {
@Test
public void test() {
public void testTwoPipelines() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FilterFunction<Long> dummyFilter = new FilterFunction<Long>() {
......@@ -40,17 +48,142 @@ public class SlotAllocationTest {
public boolean filter(Long value) { return false; }
};
env.generateSequence(1, 10).filter(dummyFilter).isolateResources().filter(dummyFilter)
.disableChaining().filter(dummyFilter).startNewResourceGroup().filter(dummyFilter)
.startNewChain().print();
env.generateSequence(1, 10)
.filter(dummyFilter).slotSharingGroup("isolated")
.filter(dummyFilter).slotSharingGroup("default").disableChaining()
.filter(dummyFilter).slotSharingGroup("group 1")
.filter(dummyFilter).startNewChain()
.print().disableChaining();
// verify that a second pipeline does not inherit the groups from the first pipeline
env.generateSequence(1, 10)
.filter(dummyFilter).slotSharingGroup("isolated-2")
.filter(dummyFilter).slotSharingGroup("default").disableChaining()
.filter(dummyFilter).slotSharingGroup("group 2")
.filter(dummyFilter).startNewChain()
.print().disableChaining();
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup());
assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(3).getSlotSharingGroup());
assertNotEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup());
assertNotEquals(vertices.get(3).getSlotSharingGroup(), vertices.get(4).getSlotSharingGroup());
assertEquals(vertices.get(4).getSlotSharingGroup(), vertices.get(5).getSlotSharingGroup());
assertEquals(vertices.get(5).getSlotSharingGroup(), vertices.get(6).getSlotSharingGroup());
int pipelineStart = 6;
assertEquals(vertices.get(1).getSlotSharingGroup(), vertices.get(pipelineStart + 2).getSlotSharingGroup());
assertNotEquals(vertices.get(1).getSlotSharingGroup(), vertices.get(pipelineStart + 1).getSlotSharingGroup());
assertNotEquals(vertices.get(pipelineStart + 2).getSlotSharingGroup(), vertices.get(pipelineStart + 3).getSlotSharingGroup());
assertEquals(vertices.get(pipelineStart + 3).getSlotSharingGroup(), vertices.get(pipelineStart + 4).getSlotSharingGroup());
assertEquals(vertices.get(pipelineStart + 4).getSlotSharingGroup(), vertices.get(pipelineStart + 5).getSlotSharingGroup());
}
@Test
public void testUnion() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FilterFunction<Long> dummyFilter = new FilterFunction<Long>() {
@Override
public boolean filter(Long value) { return false; }
};
DataStream<Long> src1 = env.generateSequence(1, 10);
DataStream<Long> src2 = env.generateSequence(1, 10).slotSharingGroup("src-1");
// this should not inherit group "src-1"
src1.union(src2).filter(dummyFilter);
DataStream<Long> src3 = env.generateSequence(1, 10).slotSharingGroup("group-1");
DataStream<Long> src4 = env.generateSequence(1, 10).slotSharingGroup("group-1");
// this should inherit "group-1" now
src3.union(src4).filter(dummyFilter);
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
// first pipeline
assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(4).getSlotSharingGroup());
assertNotEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(1).getSlotSharingGroup());
assertNotEquals(vertices.get(2).getSlotSharingGroup(), vertices.get(3).getSlotSharingGroup());
assertEquals(vertices.get(3).getSlotSharingGroup(), vertices.get(4).getSlotSharingGroup());
assertNotEquals(vertices.get(1).getSlotSharingGroup(), vertices.get(4).getSlotSharingGroup());
// second pipeline
assertEquals(vertices.get(2).getSlotSharingGroup(), vertices.get(3).getSlotSharingGroup());
assertEquals(vertices.get(2).getSlotSharingGroup(), vertices.get(5).getSlotSharingGroup());
assertEquals(vertices.get(3).getSlotSharingGroup(), vertices.get(5).getSlotSharingGroup());
}
@Test
public void testInheritOverride() {
// verify that we can explicitly disable inheritance of the input slot sharing groups
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FilterFunction<Long> dummyFilter = new FilterFunction<Long>() {
@Override
public boolean filter(Long value) { return false; }
};
DataStream<Long> src1 = env.generateSequence(1, 10).slotSharingGroup("group-1");
DataStream<Long> src2 = env.generateSequence(1, 10).slotSharingGroup("group-1");
// this should not inherit group but be in "default"
src1.union(src2).filter(dummyFilter).slotSharingGroup("default");
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(1).getSlotSharingGroup());
assertNotEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup());
assertNotEquals(vertices.get(1).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup());
}
@Test
public void testCoOperation() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
CoMapFunction<Long, Long, Long> dummyCoMap = new CoMapFunction<Long, Long, Long>() {
@Override
public Long map1(Long value) throws Exception {
return null;
}
@Override
public Long map2(Long value) throws Exception {
return null;
}
};
DataStream<Long> src1 = env.generateSequence(1, 10);
DataStream<Long> src2 = env.generateSequence(1, 10).slotSharingGroup("src-1");
// this should not inherit group "src-1"
src1.connect(src2).map(dummyCoMap);
DataStream<Long> src3 = env.generateSequence(1, 10).slotSharingGroup("group-1");
DataStream<Long> src4 = env.generateSequence(1, 10).slotSharingGroup("group-1");
// this should inherit "group-1" now
src3.connect(src4).map(dummyCoMap);
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
List<JobVertex> vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
// first pipeline
assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(4).getSlotSharingGroup());
assertNotEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(1).getSlotSharingGroup());
assertNotEquals(vertices.get(1).getSlotSharingGroup(), vertices.get(4).getSlotSharingGroup());
// second pipeline
assertEquals(vertices.get(2).getSlotSharingGroup(), vertices.get(3).getSlotSharingGroup());
assertEquals(vertices.get(2).getSlotSharingGroup(), vertices.get(5).getSlotSharingGroup());
assertEquals(vertices.get(3).getSlotSharingGroup(), vertices.get(5).getSlotSharingGroup());
}
}
......@@ -137,8 +137,8 @@ public class StreamTaskTestHarness<OUT> {
};
List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
StreamNode sourceVertexDummy = new StreamNode(null, 0, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(null, 1, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
StreamNode sourceVertexDummy = new StreamNode(null, 0, "group", dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(null, 1, "group", dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>()));
streamConfig.setOutEdgesInOrder(outEdgesInOrder);
......@@ -200,7 +200,7 @@ public class StreamTaskTestHarness<OUT> {
}
else {
if (taskThread.task instanceof StreamTask) {
StreamTask streamTask = (StreamTask) taskThread.task;
StreamTask<?, ?> streamTask = (StreamTask<?, ?>) taskThread.task;
while (!streamTask.isRunning()) {
Thread.sleep(100);
}
......
......@@ -116,8 +116,8 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTest
private static final long serialVersionUID = 1L;
};
StreamNode sourceVertexDummy = new StreamNode(null, 0, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(null, 1, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
StreamNode sourceVertexDummy = new StreamNode(null, 0, "default group", dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
StreamNode targetVertexDummy = new StreamNode(null, 1, "default group", dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
for (int i = 0; i < numInputGates; i++) {
......
......@@ -145,7 +145,7 @@ class DataStream[T](stream: JavaStream[T]) {
@PublicEvolving
def disableChaining(): DataStream[T] = {
stream match {
case ds: SingleOutputStreamOperator[_, _] => ds.disableChaining();
case ds: SingleOutputStreamOperator[_, _] => ds.disableChaining()
case _ =>
throw new UnsupportedOperationException("Only supported for operators.")
}
......@@ -161,7 +161,7 @@ class DataStream[T](stream: JavaStream[T]) {
@PublicEvolving
def startNewChain(): DataStream[T] = {
stream match {
case ds: SingleOutputStreamOperator[_, _] => ds.startNewChain();
case ds: SingleOutputStreamOperator[_, _] => ds.startNewChain()
case _ =>
throw new UnsupportedOperationException("Only supported for operators.")
}
......@@ -169,37 +169,23 @@ class DataStream[T](stream: JavaStream[T]) {
}
/**
* Isolates the operator in its own resource group. This will cause the
* operator to grab as many task slots as its degree of parallelism. If
* there are no free resources available, the job will fail to start.
* All subsequent operators are assigned to the default resource group.
* Sets the slot sharing group of this operation. Parallel instances of
* operations that are in the same slot sharing group will be co-located in the same
* TaskManager slot, if possible.
*
* Operations inherit the slot sharing group of input operations if all input operations
* are in the same slot sharing group and no slot sharing group was explicitly specified.
*
* Initially an operation is in the default slot sharing group. An operation can be put into
* the default group explicitly by setting the slot sharing group to `"default"`.
*
* @param slotSharingGroup The slot sharing group name.
*/
@PublicEvolving
def isolateResources(): DataStream[T] = {
stream match {
case ds: SingleOutputStreamOperator[_, _] => ds.isolateResources();
case _ =>
throw new UnsupportedOperationException("Only supported for operators.")
}
this
}
/**
* By default all operators in a streaming job share the same resource
* group. Each resource group takes as many task manager slots as the
* maximum parallelism operator in that group. By calling this method, this
* operators starts a new resource group and all subsequent operators will
* be added to this group unless specified otherwise. Please note that
* local executions have by default as many available task slots as the
* environment parallelism, so in order to start a new resource group the
* degree of parallelism for the operators must be decreased from the
* default.
*/
@PublicEvolving
def startNewResourceGroup(): DataStream[T] = {
def slotSharingGroup(slotSharingGroup: String): DataStream[T] = {
stream match {
case ds: SingleOutputStreamOperator[_, _] => ds.startNewResourceGroup();
case ds: SingleOutputStreamOperator[_, _] => ds.slotSharingGroup(slotSharingGroup)
case sink: DataStreamSink[_] => sink.slotSharingGroup(slotSharingGroup)
case _ =>
throw new UnsupportedOperationException("Only supported for operators.")
}
......@@ -216,7 +202,7 @@ class DataStream[T](stream: JavaStream[T]) {
*/
def setBufferTimeout(timeoutMillis: Long): DataStream[T] = {
stream match {
case ds: SingleOutputStreamOperator[_, _] => ds.setBufferTimeout(timeoutMillis);
case ds: SingleOutputStreamOperator[_, _] => ds.setBufferTimeout(timeoutMillis)
case _ =>
throw new UnsupportedOperationException("Only supported for operators.")
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.runtime.jobgraph.{JobVertex, JobGraph}
import org.junit.Assert._
import org.junit.Test
/**
* This verifies that slot sharing groups are correctly forwarded from user job to JobGraph.
*
* These tests also implicitly verify that chaining does not work across
* resource groups/slot sharing groups.
*/
class SlotAllocationTest {
@Test
def testSlotGroups(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val dummyFilter = new FilterFunction[Long]() {
def filter(value: Long): Boolean = {
false
}
}
env.generateSequence(1, 10)
.filter(dummyFilter).slotSharingGroup("isolated")
.filter(dummyFilter).slotSharingGroup("default").disableChaining()
.filter(dummyFilter).slotSharingGroup("group 1")
.filter(dummyFilter)
.startNewChain()
.print().disableChaining()
// verify that a second pipeline does not inherit the groups from the first pipeline
env.generateSequence(1, 10)
.filter(dummyFilter).slotSharingGroup("isolated-2")
.filter(dummyFilter).slotSharingGroup("default").disableChaining()
.filter(dummyFilter).slotSharingGroup("group 2")
.filter(dummyFilter)
.startNewChain()
.print().disableChaining()
val jobGraph: JobGraph = env.getStreamGraph.getJobGraph
val vertices = jobGraph.getVerticesSortedTopologicallyFromSources
assertEquals(vertices.get(0).getSlotSharingGroup, vertices.get(3).getSlotSharingGroup)
assertNotEquals(vertices.get(0).getSlotSharingGroup, vertices.get(2).getSlotSharingGroup)
assertNotEquals(vertices.get(3).getSlotSharingGroup, vertices.get(4).getSlotSharingGroup)
assertEquals(vertices.get(4).getSlotSharingGroup, vertices.get(5).getSlotSharingGroup)
assertEquals(vertices.get(5).getSlotSharingGroup, vertices.get(6).getSlotSharingGroup)
val s: Int = 6
assertEquals(vertices.get(1).getSlotSharingGroup, vertices.get(s + 2).getSlotSharingGroup)
assertNotEquals(vertices.get(1).getSlotSharingGroup, vertices.get(s + 1).getSlotSharingGroup)
assertNotEquals(
vertices.get(s + 2).getSlotSharingGroup,
vertices.get(s + 3).getSlotSharingGroup)
assertEquals(vertices.get(s + 3).getSlotSharingGroup, vertices.get(s + 4).getSlotSharingGroup)
assertEquals(vertices.get(s + 4).getSlotSharingGroup, vertices.get(s + 5).getSlotSharingGroup)
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册