提交 ca7f36e1 编写于 作者: G Gyula Fora

[streaming] Discretizer sharing added with further window optimzations for better chaining

上级 e5528a25
......@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.flink.api.common.ExecutionConfig;
......@@ -256,9 +257,10 @@ public class StreamGraph extends StreamingPlan {
addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName, parallelism);
addTypeSerializers(vertexName, new StreamRecordSerializer<IN1>(in1TypeInfo, executionConfig),
new StreamRecordSerializer<IN2>(in2TypeInfo, executionConfig), new StreamRecordSerializer<OUT>(
outTypeInfo, executionConfig), null);
addTypeSerializers(vertexName,
new StreamRecordSerializer<IN1>(in1TypeInfo, executionConfig),
new StreamRecordSerializer<IN2>(in2TypeInfo, executionConfig),
new StreamRecordSerializer<OUT>(outTypeInfo, executionConfig), null);
if (LOG.isDebugEnabled()) {
LOG.debug("CO-TASK: {}", vertexName);
......@@ -319,6 +321,17 @@ public class StreamGraph extends StreamingPlan {
selectedNames.get(upStreamVertexName).add(outputNames);
}
public void removeEdge(String upStream, String downStream) {
int inputIndex = getInEdges(downStream).indexOf(upStream);
inEdgeLists.get(downStream).remove(inputIndex);
int outputIndex = getOutEdges(upStream).indexOf(downStream);
outEdgeLists.get(upStream).remove(outputIndex);
outEdgeTypes.get(upStream).remove(outputIndex);
selectedNames.get(upStream).remove(outputIndex);
outputPartitioners.get(upStream).remove(outputIndex);
}
private void addTypeSerializers(String vertexName, StreamRecordSerializer<?> in1,
StreamRecordSerializer<?> in2, StreamRecordSerializer<?> out1,
StreamRecordSerializer<?> out2) {
......@@ -404,7 +417,8 @@ public class StreamGraph extends StreamingPlan {
}
public <OUT> void setOutType(String id, TypeInformation<OUT> outType) {
StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType, executionConfig);
StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType,
executionConfig);
typeSerializersOut1.put(id, serializer);
}
......@@ -480,6 +494,10 @@ public class StreamGraph extends StreamingPlan {
this.chaining = chaining;
}
public Set<Entry<String, StreamInvokable<?, ?>>> getInvokables() {
return invokableObjects.entrySet();
}
public Collection<String> getSources() {
return sources;
}
......
......@@ -23,6 +23,8 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
......@@ -36,6 +38,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
import org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer;
import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
......@@ -71,20 +74,97 @@ public class StreamingJobGraphGenerator {
public JobGraph createJobGraph(String jobName) {
jobGraph = new JobGraph(jobName);
// Turn lazy scheduling off
jobGraph.setScheduleMode(ScheduleMode.ALL);
init();
for (String sourceName : streamGraph.getSources()) {
createChain(sourceName, sourceName);
}
applyWindowOptimizations();
setChaining();
setSlotSharing();
return jobGraph;
}
private void applyWindowOptimizations() {
Set<Entry<String, StreamInvokable<?, ?>>> invokables = streamGraph.getInvokables();
List<Tuple2<String, StreamDiscretizer<?>>> discretizers = new ArrayList<Tuple2<String, StreamDiscretizer<?>>>();
// Get the discretizers
for (Entry<String, StreamInvokable<?, ?>> entry : invokables) {
if (entry.getValue() instanceof StreamDiscretizer) {
discretizers.add(new Tuple2<String, StreamDiscretizer<?>>(entry.getKey(),
(StreamDiscretizer<?>) entry.getValue()));
}
}
// Share common discrtizers
setDiscretizerReuse(discretizers);
}
private void setDiscretizerReuse(List<Tuple2<String, StreamDiscretizer<?>>> discretizers) {
List<Tuple2<StreamDiscretizer<?>, List<String>>> matchingDiscretizers = new ArrayList<Tuple2<StreamDiscretizer<?>, List<String>>>();
for (Tuple2<String, StreamDiscretizer<?>> discretizer : discretizers) {
boolean inMatching = false;
for (Tuple2<StreamDiscretizer<?>, List<String>> matching : matchingDiscretizers) {
Set<String> discretizerInEdges = new HashSet<String>(
streamGraph.getInEdges(discretizer.f0));
Set<String> matchingInEdges = new HashSet<String>(
streamGraph.getInEdges(matching.f1.get(0)));
if (discretizer.f1.equals(matching.f0)
&& discretizerInEdges.equals(matchingInEdges)) {
matching.f1.add(discretizer.f0);
inMatching = true;
break;
}
}
if (!inMatching) {
List<String> matchingNames = new ArrayList<String>();
matchingNames.add(discretizer.f0);
matchingDiscretizers.add(new Tuple2<StreamDiscretizer<?>, List<String>>(
discretizer.f1, matchingNames));
}
}
for (Tuple2<StreamDiscretizer<?>, List<String>> matching : matchingDiscretizers) {
List<String> matchList = matching.f1;
if (matchList.size() > 1) {
String first = matchList.get(0);
for (int i = 1; i < matchList.size(); i++) {
replaceDiscretizer(matchList.get(i), first);
}
}
}
}
private void replaceDiscretizer(String toReplace, String replaceWith) {
// Convert to array to create a copy
List<String> outEdges = new ArrayList<String>(streamGraph.getOutEdges(toReplace));
int numOutputs = outEdges.size();
// Reconnect outputs
for (int i = 0; i < numOutputs; i++) {
String outName = outEdges.get(i);
streamGraph.setEdge(replaceWith, outName,
streamGraph.getOutPartitioner(toReplace, outName), 0, new ArrayList<String>());
streamGraph.removeEdge(toReplace, outName);
}
}
private void setChaining() {
for (String sourceName : streamGraph.getSources()) {
createChain(sourceName, sourceName);
}
}
private List<Tuple2<String, String>> createChain(String startNode, String current) {
if (!builtNodes.contains(startNode)) {
......
......@@ -73,7 +73,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
WindowTransformation.REDUCEWINDOW, "Window Reduce", getType(),
new WindowReducer<OUT>(reduceFunction)).merge();
if (!isGrouped()) {
if (!isGrouped() && out.discretizedStream.invokable instanceof WindowMerger) {
return out.transform(WindowTransformation.REDUCEWINDOW, "Window Reduce", out.getType(),
new WindowReducer<OUT>(discretizedStream.clean(reduceFunction)));
} else {
......@@ -122,7 +122,9 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
out.groupByKey = null;
return out;
} else if (transformation == WindowTransformation.MAPWINDOW) {
} else if (transformation != WindowTransformation.MAPWINDOW
&& parallelism != discretizedStream.getExecutionEnvironment()
.getDegreeOfParallelism()) {
return transform(transformation, "Window partitioner", getType(),
new WindowPartitioner<OUT>(parallelism)).setParallelism(parallelism);
} else {
......@@ -137,8 +139,14 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
private DiscretizedStream<OUT> merge() {
TypeInformation<StreamWindow<OUT>> type = discretizedStream.getType();
return wrap(discretizedStream.groupBy(new WindowKey<OUT>()).transform("Window Merger",
type, new WindowMerger<OUT>()));
// Only merge partitioned streams
if (discretizedStream.invokable instanceof WindowPartitioner) {
return wrap(discretizedStream.groupBy(new WindowKey<OUT>()).transform("Window Merger",
type, new WindowMerger<OUT>()));
} else {
return this;
}
}
@SuppressWarnings("rawtypes")
......
......@@ -125,4 +125,9 @@ public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
}
}
@Override
public String toString() {
return "GroupedDiscretizer(Key: " + keySelector.getClass().getSimpleName() + ", Trigger: "
+ triggerPolicy.toString() + ", Eviction: " + evictionPolicy.toString() + ")";
}
}
......@@ -213,4 +213,10 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, WindowEvent<IN>>
}
}
}
@Override
public String toString() {
return "Discretizer(Trigger: " + triggerPolicy.toString() + ", Eviction: " + evictionPolicy.toString()
+ ")";
}
}
......@@ -135,4 +135,9 @@ public class CountEvictionPolicy<IN> implements CloneableEvictionPolicy<IN> {
}
}
}
@Override
public String toString() {
return "CountPolicy(" + maxElements + ")";
}
}
......@@ -101,4 +101,9 @@ public class CountTriggerPolicy<IN> implements CloneableTriggerPolicy<IN> {
}
}
}
@Override
public String toString() {
return "CountPolicy(" + max + ")";
}
}
......@@ -128,4 +128,9 @@ public class DeltaPolicy<DATA> implements CloneableTriggerPolicy<DATA>,
}
}
}
@Override
public String toString() {
return "DeltaPolicy(" + threshold + ", " + deltaFuntion.getClass().getSimpleName() + ")";
}
}
......@@ -135,4 +135,10 @@ public class PunctuationPolicy<IN, DATA> implements CloneableTriggerPolicy<IN>,
}
}
}
@Override
public String toString() {
return "PunctuationPolicy(" + punctuation + extractor != null ? ", "
+ extractor.getClass().getSimpleName() : "" + ")";
}
}
......@@ -150,4 +150,10 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
}
}
@Override
public String toString() {
return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()
+ ")";
}
}
......@@ -213,4 +213,10 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
}
}
@Override
public String toString() {
return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()
+ ")";
}
}
......@@ -96,4 +96,9 @@ public class TumblingEvictionPolicy<DATA> implements CloneableEvictionPolicy<DAT
return true;
}
}
@Override
public String toString() {
return "TumblingPolicy";
}
}
......@@ -32,6 +32,8 @@ import org.apache.flink.streaming.api.function.WindowMapFunction;
import org.apache.flink.streaming.api.function.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.helper.Count;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
......@@ -97,12 +99,29 @@ public class WindowIntegrationTest implements Serializable {
source.window(Count.of(4)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
.flatten().addSink(new CentralSink2());
source.groupBy(new ModKey(3)).window(Count.of(2)).sum(0).getDiscretizedStream()
KeySelector<Integer, ?> key = new ModKey(2);
Timestamp<Integer> ts = new Timestamp<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public long getTimestamp(Integer value) {
return value;
}
};
source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
.addSink(new DistributedSink1());
source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
.mapWindow(new IdentityWindowMap()).flatten().addSink(new DistributedSink2());
source.window(Count.of(2)).every(Count.of(3)).min(0).getDiscretizedStream()
.addSink(new CentralSink3());
source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
.addSink(new DistributedSink3());
env.execute();
// sum ( Count of 2 slide 3 )
......@@ -123,14 +142,13 @@ public class WindowIntegrationTest implements Serializable {
validateOutput(expected2, CentralSink2.windows);
// groupby mod 3 sum ( Tumbling Count of 2)
// groupby mod 2 sum ( Tumbling Time of 4)
List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>();
expected3.add(StreamWindow.fromElements(4));
expected3.add(StreamWindow.fromElements(5));
expected3.add(StreamWindow.fromElements(16));
expected3.add(StreamWindow.fromElements(22));
expected3.add(StreamWindow.fromElements(8));
expected3.add(StreamWindow.fromElements(10));
expected3.add(StreamWindow.fromElements(11));
expected3.add(StreamWindow.fromElements(3));
validateOutput(expected3, DistributedSink1.windows);
......@@ -146,6 +164,22 @@ public class WindowIntegrationTest implements Serializable {
validateOutput(expected4, DistributedSink2.windows);
// min ( Count of 2 slide 3 )
List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
expected5.add(StreamWindow.fromElements(2));
expected5.add(StreamWindow.fromElements(4));
expected5.add(StreamWindow.fromElements(11));
validateOutput(expected5, CentralSink3.windows);
// groupby mod 2 max ( Tumbling Time of 4)
List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>();
expected6.add(StreamWindow.fromElements(3));
expected6.add(StreamWindow.fromElements(5));
expected6.add(StreamWindow.fromElements(11));
expected6.add(StreamWindow.fromElements(4));
expected6.add(StreamWindow.fromElements(10));
}
public static <R> void validateOutput(List<R> expected, List<R> actual) {
......@@ -178,6 +212,19 @@ public class WindowIntegrationTest implements Serializable {
}
@SuppressWarnings("serial")
private static class CentralSink3 implements SinkFunction<StreamWindow<Integer>> {
public static List<StreamWindow<Integer>> windows = Collections
.synchronizedList(new ArrayList<StreamWindow<Integer>>());
@Override
public void invoke(StreamWindow<Integer> value) throws Exception {
windows.add(value);
}
}
@SuppressWarnings("serial")
private static class DistributedSink1 implements SinkFunction<StreamWindow<Integer>> {
......@@ -203,4 +250,17 @@ public class WindowIntegrationTest implements Serializable {
}
}
@SuppressWarnings("serial")
private static class DistributedSink3 implements SinkFunction<StreamWindow<Integer>> {
public static List<StreamWindow<Integer>> windows = Collections
.synchronizedList(new ArrayList<StreamWindow<Integer>>());
@Override
public void invoke(StreamWindow<Integer> value) throws Exception {
windows.add(value);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册