提交 7ee719f7 编写于 作者: S Stephan Ewen

[FLINK-3255] [streaming] Disable parallelism-dependent chaining optimization

This closes #1518
上级 4806158e
......@@ -465,7 +465,7 @@ public class StreamGraph extends StreamingPlan {
}
public Set<Tuple2<Integer, StreamOperator<?>>> getOperators() {
Set<Tuple2<Integer, StreamOperator<?>>> operatorSet = new HashSet<Tuple2<Integer, StreamOperator<?>>>();
Set<Tuple2<Integer, StreamOperator<?>>> operatorSet = new HashSet<>();
for (StreamNode vertex : streamNodes.values()) {
operatorSet.add(new Tuple2<Integer, StreamOperator<?>>(vertex.getId(), vertex
.getOperator()));
......@@ -496,7 +496,7 @@ public class StreamGraph extends StreamingPlan {
sinks.add(sink.getId());
setParallelism(sink.getId(), parallelism);
iterationSourceSinkPairs.add(new Tuple2<StreamNode, StreamNode>(source, sink));
iterationSourceSinkPairs.add(new Tuple2<>(source, sink));
source.setOperatorName("IterationSource-" + loopId);
sink.setOperatorName("IterationSink-" + loopId);
......@@ -505,7 +505,7 @@ public class StreamGraph extends StreamingPlan {
this.vertexIDtoLoopTimeout.put(source.getId(), timeout);
this.vertexIDtoLoopTimeout.put(sink.getId(), timeout);
return new Tuple2<StreamNode, StreamNode>(source, sink);
return new Tuple2<>(source, sink);
}
public Set<Tuple2<StreamNode, StreamNode>> getIterationSourceSinkPairs() {
......@@ -518,7 +518,7 @@ public class StreamGraph extends StreamingPlan {
}
private void removeVertex(StreamNode toRemove) {
Set<StreamEdge> edgesToRemove = new HashSet<StreamEdge>();
Set<StreamEdge> edgesToRemove = new HashSet<>();
edgesToRemove.addAll(toRemove.getInEdges());
edgesToRemove.addAll(toRemove.getOutEdges());
......
......@@ -229,14 +229,14 @@ public class StreamingJobGraphGenerator {
return transitiveOutEdges;
} else {
return new ArrayList<StreamEdge>();
return new ArrayList<>();
}
}
private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutputs) {
String operatorName = streamGraph.getStreamNode(vertexID).getOperatorName();
if (chainedOutputs.size() > 1) {
List<String> outputChainedNames = new ArrayList<String>();
List<String> outputChainedNames = new ArrayList<>();
for (StreamEdge chainable : chainedOutputs) {
outputChainedNames.add(chainedNames.get(chainable.getTargetId()));
}
......@@ -395,8 +395,7 @@ public class StreamingJobGraphGenerator {
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS ||
headOperator.getChainingStrategy() == ChainingStrategy.FORCE_ALWAYS)
&& (edge.getPartitioner() instanceof ForwardPartitioner || downStreamVertex
.getParallelism() == 1)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& (streamGraph.isChainingEnabled() ||
outOperator.getChainingStrategy() == ChainingStrategy.FORCE_ALWAYS);
......@@ -404,7 +403,7 @@ public class StreamingJobGraphGenerator {
private void setSlotSharing() {
Map<Integer, SlotSharingGroup> slotSharingGroups = new HashMap<Integer, SlotSharingGroup>();
Map<Integer, SlotSharingGroup> slotSharingGroups = new HashMap<>();
for (Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
......@@ -446,15 +445,15 @@ public class StreamingJobGraphGenerator {
// collect the vertices that receive "trigger checkpoint" messages.
// currently, these are all the sources
List<JobVertexID> triggerVertices = new ArrayList<JobVertexID>();
List<JobVertexID> triggerVertices = new ArrayList<>();
// collect the vertices that need to acknowledge the checkpoint
// currently, these are all vertices
List<JobVertexID> ackVertices = new ArrayList<JobVertexID>(jobVertices.size());
List<JobVertexID> ackVertices = new ArrayList<>(jobVertices.size());
// collect the vertices that receive "commit checkpoint" messages
// currently, these are all vertices
List<JobVertexID> commitVertices = new ArrayList<JobVertexID>();
List<JobVertexID> commitVertices = new ArrayList<>();
for (JobVertex vertex : jobVertices.values()) {
if (vertex.isInputVertex()) {
......
......@@ -51,7 +51,7 @@ import static org.junit.Assert.assertTrue;
* specific tests, for example in {@link org.apache.flink.streaming.api.IterateTest} for
* iterations.
*/
public class StreamGraphGeneratorTest extends StreamingMultipleProgramsTestBase {
public class StreamGraphGeneratorTest {
/**
* This tests whether virtual Transformations behave correctly.
......@@ -282,14 +282,12 @@ public class StreamGraphGeneratorTest extends StreamingMultipleProgramsTestBase
}
@Override
public void processElement(StreamRecord<Integer> element) throws Exception {
public void processElement(StreamRecord<Integer> element) {
output.collect(element);
}
@Override
public void processWatermark(Watermark mark) throws Exception {
}
public void processWatermark(Watermark mark) {}
@Override
public void setOutputType(TypeInformation<Integer> outTypeInfo, ExecutionConfig executionConfig) {
......
......@@ -14,29 +14,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.graph;
import java.io.IOException;
import java.util.Random;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.InstantiationUtil;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamingJobGraphGeneratorTest extends StreamingMultipleProgramsTestBase {
private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGeneratorTest.class);
import static org.junit.Assert.*;
public class StreamingJobGraphGeneratorTest {
@Test
public void testExecutionConfigSerialization() throws IOException, ClassNotFoundException {
final long seed = System.currentTimeMillis();
LOG.info("Test seed: {}", new Long(seed));
final Random r = new Random(seed);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
......@@ -76,16 +77,62 @@ public class StreamingJobGraphGeneratorTest extends StreamingMultipleProgramsTes
config.setParallelism(dop);
JobGraph jobGraph = compiler.createJobGraph("test");
ExecutionConfig executionConfig = (ExecutionConfig) InstantiationUtil.readObjectFromConfig(
ExecutionConfig executionConfig = InstantiationUtil.readObjectFromConfig(
jobGraph.getJobConfiguration(),
ExecutionConfig.CONFIG_KEY,
Thread.currentThread().getContextClassLoader());
Assert.assertEquals(closureCleanerEnabled, executionConfig.isClosureCleanerEnabled());
Assert.assertEquals(forceAvroEnabled, executionConfig.isForceAvroEnabled());
Assert.assertEquals(forceKryoEnabled, executionConfig.isForceKryoEnabled());
Assert.assertEquals(objectReuseEnabled, executionConfig.isObjectReuseEnabled());
Assert.assertEquals(sysoutLoggingEnabled, executionConfig.isSysoutLoggingEnabled());
Assert.assertEquals(dop, executionConfig.getParallelism());
assertNotNull(executionConfig);
assertEquals(closureCleanerEnabled, executionConfig.isClosureCleanerEnabled());
assertEquals(forceAvroEnabled, executionConfig.isForceAvroEnabled());
assertEquals(forceKryoEnabled, executionConfig.isForceKryoEnabled());
assertEquals(objectReuseEnabled, executionConfig.isObjectReuseEnabled());
assertEquals(sysoutLoggingEnabled, executionConfig.isSysoutLoggingEnabled());
assertEquals(dop, executionConfig.getParallelism());
}
@Test
public void testParallelismOneNotChained() {
// --------- the program ---------
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Tuple2<String, String>> input = env
.fromElements("a", "b", "c", "d", "e", "f")
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String value) {
return new Tuple2<>(value, value);
}
});
DataStream<Tuple2<String, String>> result = input
.keyBy(0)
.map(new MapFunction<Tuple2<String, String>, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(Tuple2<String, String> value) {
return value;
}
});
result.addSink(new SinkFunction<Tuple2<String, String>>() {
@Override
public void invoke(Tuple2<String, String> value) {}
});
// --------- the job graph ---------
StreamGraph streamGraph = env.getStreamGraph();
streamGraph.setJobName("test job");
JobGraph jobGraph = streamGraph.getJobGraph();
assertEquals(2, jobGraph.getNumberOfVertices());
assertEquals(1, jobGraph.getVerticesAsArray()[0].getParallelism());
assertEquals(1, jobGraph.getVerticesAsArray()[1].getParallelism());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册