diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java index 4800c96d6a8d876e1e03a90d47de63376cd5143f..c87abb54c5cff8f3b08b1d5665ce6087077a0e59 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.flink.api.common.ExecutionConfig; @@ -256,9 +257,10 @@ public class StreamGraph extends StreamingPlan { addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName, parallelism); - addTypeSerializers(vertexName, new StreamRecordSerializer(in1TypeInfo, executionConfig), - new StreamRecordSerializer(in2TypeInfo, executionConfig), new StreamRecordSerializer( - outTypeInfo, executionConfig), null); + addTypeSerializers(vertexName, + new StreamRecordSerializer(in1TypeInfo, executionConfig), + new StreamRecordSerializer(in2TypeInfo, executionConfig), + new StreamRecordSerializer(outTypeInfo, executionConfig), null); if (LOG.isDebugEnabled()) { LOG.debug("CO-TASK: {}", vertexName); @@ -319,6 +321,17 @@ public class StreamGraph extends StreamingPlan { selectedNames.get(upStreamVertexName).add(outputNames); } + public void removeEdge(String upStream, String downStream) { + int inputIndex = getInEdges(downStream).indexOf(upStream); + inEdgeLists.get(downStream).remove(inputIndex); + + int outputIndex = getOutEdges(upStream).indexOf(downStream); + outEdgeLists.get(upStream).remove(outputIndex); + outEdgeTypes.get(upStream).remove(outputIndex); + selectedNames.get(upStream).remove(outputIndex); + outputPartitioners.get(upStream).remove(outputIndex); + } + private void addTypeSerializers(String vertexName, StreamRecordSerializer in1, StreamRecordSerializer in2, StreamRecordSerializer out1, StreamRecordSerializer out2) { @@ -404,7 +417,8 @@ public class StreamGraph extends StreamingPlan { } public void setOutType(String id, TypeInformation outType) { - StreamRecordSerializer serializer = new StreamRecordSerializer(outType, executionConfig); + StreamRecordSerializer serializer = new StreamRecordSerializer(outType, + executionConfig); typeSerializersOut1.put(id, serializer); } @@ -480,6 +494,10 @@ public class StreamGraph extends StreamingPlan { this.chaining = chaining; } + public Set>> getInvokables() { + return invokableObjects.entrySet(); + } + public Collection getSources() { return sources; } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java index d57bfc04731c29684834d3d85de286a48213dc69..f3edb78221f654b2f2cda6507613c80f879d0621 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java @@ -23,6 +23,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.java.tuple.Tuple2; @@ -36,6 +38,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy; +import org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer; import org.apache.flink.streaming.api.streamvertex.StreamIterationHead; import org.apache.flink.streaming.api.streamvertex.StreamIterationTail; import org.apache.flink.streaming.partitioner.StreamPartitioner; @@ -71,20 +74,97 @@ public class StreamingJobGraphGenerator { public JobGraph createJobGraph(String jobName) { jobGraph = new JobGraph(jobName); + // Turn lazy scheduling off jobGraph.setScheduleMode(ScheduleMode.ALL); init(); - for (String sourceName : streamGraph.getSources()) { - createChain(sourceName, sourceName); - } + applyWindowOptimizations(); + + setChaining(); setSlotSharing(); return jobGraph; } + private void applyWindowOptimizations() { + Set>> invokables = streamGraph.getInvokables(); + List>> discretizers = new ArrayList>>(); + + // Get the discretizers + for (Entry> entry : invokables) { + if (entry.getValue() instanceof StreamDiscretizer) { + discretizers.add(new Tuple2>(entry.getKey(), + (StreamDiscretizer) entry.getValue())); + } + } + + // Share common discrtizers + setDiscretizerReuse(discretizers); + + } + + private void setDiscretizerReuse(List>> discretizers) { + List, List>> matchingDiscretizers = new ArrayList, List>>(); + + for (Tuple2> discretizer : discretizers) { + boolean inMatching = false; + for (Tuple2, List> matching : matchingDiscretizers) { + Set discretizerInEdges = new HashSet( + streamGraph.getInEdges(discretizer.f0)); + Set matchingInEdges = new HashSet( + streamGraph.getInEdges(matching.f1.get(0))); + + if (discretizer.f1.equals(matching.f0) + && discretizerInEdges.equals(matchingInEdges)) { + matching.f1.add(discretizer.f0); + inMatching = true; + break; + } + } + if (!inMatching) { + List matchingNames = new ArrayList(); + matchingNames.add(discretizer.f0); + matchingDiscretizers.add(new Tuple2, List>( + discretizer.f1, matchingNames)); + } + } + + for (Tuple2, List> matching : matchingDiscretizers) { + List matchList = matching.f1; + if (matchList.size() > 1) { + String first = matchList.get(0); + for (int i = 1; i < matchList.size(); i++) { + replaceDiscretizer(matchList.get(i), first); + } + } + } + } + + private void replaceDiscretizer(String toReplace, String replaceWith) { + // Convert to array to create a copy + List outEdges = new ArrayList(streamGraph.getOutEdges(toReplace)); + + int numOutputs = outEdges.size(); + + // Reconnect outputs + for (int i = 0; i < numOutputs; i++) { + String outName = outEdges.get(i); + + streamGraph.setEdge(replaceWith, outName, + streamGraph.getOutPartitioner(toReplace, outName), 0, new ArrayList()); + streamGraph.removeEdge(toReplace, outName); + } + } + + private void setChaining() { + for (String sourceName : streamGraph.getSources()) { + createChain(sourceName, sourceName); + } + } + private List> createChain(String startNode, String current) { if (!builtNodes.contains(startNode)) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java index bc9f5b9db91775bd45abeb177bd0f59013a39c70..0c84d0a1e6a03e56678de03f3ad799e928d81d17 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java @@ -73,7 +73,7 @@ public class DiscretizedStream extends WindowedDataStream { WindowTransformation.REDUCEWINDOW, "Window Reduce", getType(), new WindowReducer(reduceFunction)).merge(); - if (!isGrouped()) { + if (!isGrouped() && out.discretizedStream.invokable instanceof WindowMerger) { return out.transform(WindowTransformation.REDUCEWINDOW, "Window Reduce", out.getType(), new WindowReducer(discretizedStream.clean(reduceFunction))); } else { @@ -122,7 +122,9 @@ public class DiscretizedStream extends WindowedDataStream { out.groupByKey = null; return out; - } else if (transformation == WindowTransformation.MAPWINDOW) { + } else if (transformation != WindowTransformation.MAPWINDOW + && parallelism != discretizedStream.getExecutionEnvironment() + .getDegreeOfParallelism()) { return transform(transformation, "Window partitioner", getType(), new WindowPartitioner(parallelism)).setParallelism(parallelism); } else { @@ -137,8 +139,14 @@ public class DiscretizedStream extends WindowedDataStream { private DiscretizedStream merge() { TypeInformation> type = discretizedStream.getType(); - return wrap(discretizedStream.groupBy(new WindowKey()).transform("Window Merger", - type, new WindowMerger())); + // Only merge partitioned streams + if (discretizedStream.invokable instanceof WindowPartitioner) { + return wrap(discretizedStream.groupBy(new WindowKey()).transform("Window Merger", + type, new WindowMerger())); + } else { + return this; + } + } @SuppressWarnings("rawtypes") diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java index ec01e28b979b34f90f327f05031a34dccc6c493e..5e21a31a6bfb746f1fe6f7216e485aea6345f4a4 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java @@ -125,4 +125,9 @@ public class GroupedStreamDiscretizer extends StreamDiscretizer { } } + @Override + public String toString() { + return "GroupedDiscretizer(Key: " + keySelector.getClass().getSimpleName() + ", Trigger: " + + triggerPolicy.toString() + ", Eviction: " + evictionPolicy.toString() + ")"; + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java index fb65283e1a1487eb8705697c2f34831e1b7cb635..104196ec4097066cd04d46213dbed1bd6a04e172 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java @@ -213,4 +213,10 @@ public class StreamDiscretizer extends StreamInvokable> } } } + + @Override + public String toString() { + return "Discretizer(Trigger: " + triggerPolicy.toString() + ", Eviction: " + evictionPolicy.toString() + + ")"; + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java index b4bdd43d7a2c648ca3f6a12b292e3fb9b83234a5..3ede27b9df9b0cb9ad00001f83467ca815ed2a4d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java @@ -135,4 +135,9 @@ public class CountEvictionPolicy implements CloneableEvictionPolicy { } } } + + @Override + public String toString() { + return "CountPolicy(" + maxElements + ")"; + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java index c814f5d791b7861579447e1f397238952925119c..6d8149ab7988ce60abca60d738116cbaf39f9580 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java @@ -101,4 +101,9 @@ public class CountTriggerPolicy implements CloneableTriggerPolicy { } } } + + @Override + public String toString() { + return "CountPolicy(" + max + ")"; + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java index 169196ff6183aca6c2619dd7c2e2f49f3151253a..0811e0552b94bd06ae8ec02106b0b9bda0f26d4c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java @@ -128,4 +128,9 @@ public class DeltaPolicy implements CloneableTriggerPolicy, } } } + + @Override + public String toString() { + return "DeltaPolicy(" + threshold + ", " + deltaFuntion.getClass().getSimpleName() + ")"; + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java index 4b06aeefc0e2779a712fd3283c8e52eda444ebb4..e37c44339874baa0eb86920a92cffa455724877c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java @@ -135,4 +135,10 @@ public class PunctuationPolicy implements CloneableTriggerPolicy, } } } + + @Override + public String toString() { + return "PunctuationPolicy(" + punctuation + extractor != null ? ", " + + extractor.getClass().getSimpleName() : "" + ")"; + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java index de715af2509b2831c4d6ee59aed1e8f462a73b7e..982b6d5f2ae39702eb916b016f49b35ba89c678c 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java @@ -150,4 +150,10 @@ public class TimeEvictionPolicy implements ActiveEvictionPolicy, } } + @Override + public String toString() { + return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName() + + ")"; + } + } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java index 84eccf56612b8090720f155a1950f8ffc36f0541..7065582f2c9ad087919a6128a00c748b3f5f37c9 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java @@ -213,4 +213,10 @@ public class TimeTriggerPolicy implements ActiveTriggerPolicy, } } + @Override + public String toString() { + return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName() + + ")"; + } + } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java index 8d9f02817f0b2481415debca14aa8c8e0dfe85f3..08c49e90f10eaefc4ed5b08729449a5d4c8f0c1a 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java @@ -96,4 +96,9 @@ public class TumblingEvictionPolicy implements CloneableEvictionPolicy key = new ModKey(2); + Timestamp ts = new Timestamp() { + + private static final long serialVersionUID = 1L; + + @Override + public long getTimestamp(Integer value) { + return value; + } + }; + + source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream() .addSink(new DistributedSink1()); source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2)) .mapWindow(new IdentityWindowMap()).flatten().addSink(new DistributedSink2()); + source.window(Count.of(2)).every(Count.of(3)).min(0).getDiscretizedStream() + .addSink(new CentralSink3()); + + source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream() + .addSink(new DistributedSink3()); + env.execute(); // sum ( Count of 2 slide 3 ) @@ -123,14 +142,13 @@ public class WindowIntegrationTest implements Serializable { validateOutput(expected2, CentralSink2.windows); - // groupby mod 3 sum ( Tumbling Count of 2) + // groupby mod 2 sum ( Tumbling Time of 4) List> expected3 = new ArrayList>(); expected3.add(StreamWindow.fromElements(4)); expected3.add(StreamWindow.fromElements(5)); - expected3.add(StreamWindow.fromElements(16)); + expected3.add(StreamWindow.fromElements(22)); + expected3.add(StreamWindow.fromElements(8)); expected3.add(StreamWindow.fromElements(10)); - expected3.add(StreamWindow.fromElements(11)); - expected3.add(StreamWindow.fromElements(3)); validateOutput(expected3, DistributedSink1.windows); @@ -146,6 +164,22 @@ public class WindowIntegrationTest implements Serializable { validateOutput(expected4, DistributedSink2.windows); + // min ( Count of 2 slide 3 ) + List> expected5 = new ArrayList>(); + expected5.add(StreamWindow.fromElements(2)); + expected5.add(StreamWindow.fromElements(4)); + expected5.add(StreamWindow.fromElements(11)); + + validateOutput(expected5, CentralSink3.windows); + + // groupby mod 2 max ( Tumbling Time of 4) + List> expected6 = new ArrayList>(); + expected6.add(StreamWindow.fromElements(3)); + expected6.add(StreamWindow.fromElements(5)); + expected6.add(StreamWindow.fromElements(11)); + expected6.add(StreamWindow.fromElements(4)); + expected6.add(StreamWindow.fromElements(10)); + } public static void validateOutput(List expected, List actual) { @@ -178,6 +212,19 @@ public class WindowIntegrationTest implements Serializable { } + @SuppressWarnings("serial") + private static class CentralSink3 implements SinkFunction> { + + public static List> windows = Collections + .synchronizedList(new ArrayList>()); + + @Override + public void invoke(StreamWindow value) throws Exception { + windows.add(value); + } + + } + @SuppressWarnings("serial") private static class DistributedSink1 implements SinkFunction> { @@ -203,4 +250,17 @@ public class WindowIntegrationTest implements Serializable { } } + + @SuppressWarnings("serial") + private static class DistributedSink3 implements SinkFunction> { + + public static List> windows = Collections + .synchronizedList(new ArrayList>()); + + @Override + public void invoke(StreamWindow value) throws Exception { + windows.add(value); + } + + } }