From cb205d3a1f8d8fa9cc71f197d35b92d25e0a5623 Mon Sep 17 00:00:00 2001 From: gyfora Date: Mon, 14 Jul 2014 16:29:20 +0200 Subject: [PATCH] [streaming] StreamCollector2 added to streamcomponenthelper with fix --- .../streaming/api/StreamCollector.java | 1 + .../streaming/api/StreamCollector2.java | 39 +++++++++++-------- .../StreamComponentHelper.java | 9 ++--- .../streaming/api/BatchReduceTest.java | 4 +- .../streaming/api/StreamCollector2Test.java | 22 +++++++++-- 5 files changed, 47 insertions(+), 28 deletions(-) diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java index 03a24e7e9cc..bac07322ff7 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java @@ -53,6 +53,7 @@ public class StreamCollector implements Collector { // TODO reconsider emitting mechanism at timeout (find a place to timeout) @Override public void collect(T tuple) { + //TODO: move copy to StreamCollector2 streamRecord.setTuple(counter, StreamRecord.copyTuple(tuple)); counter++; diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector2.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector2.java index 66bdf7ad6df..ae70b5a7b41 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector2.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector2.java @@ -28,45 +28,50 @@ public class StreamCollector2 implements Collector { ArrayList> notPartitionedCollectors; ArrayList[]> partitionedCollectors; + List> partitionedOutputs; + List> notPartitionedOutputs; int keyPostition; - + // TODO consider channelID - public StreamCollector2(int[] batchSizesOfNotPartitioned, int[] batchSizesOfPartitioned, - int[] parallelismOfOutput, int keyPosition, long batchTimeout, int channelID, + public StreamCollector2(List batchSizesOfNotPartitioned, List batchSizesOfPartitioned, + List parallelismOfOutput, int keyPosition, long batchTimeout, int channelID, SerializationDelegate serializationDelegate, - List> outputs) { + List> partitionedOutputs, + List> notPartitionedOutputs) { notPartitionedCollectors = new ArrayList>( - batchSizesOfNotPartitioned.length); + batchSizesOfNotPartitioned.size()); partitionedCollectors = new ArrayList[]>( - batchSizesOfPartitioned.length); + batchSizesOfPartitioned.size()); this.keyPostition = keyPosition; - - for (int i = 0; i < batchSizesOfNotPartitioned.length; i++) { - notPartitionedCollectors.add(new StreamCollector(batchSizesOfNotPartitioned[i], - batchTimeout, channelID, serializationDelegate, outputs)); + + for (int i = 0; i < batchSizesOfNotPartitioned.size(); i++) { + notPartitionedCollectors.add(new StreamCollector(batchSizesOfNotPartitioned.get(i), + batchTimeout, channelID, serializationDelegate, notPartitionedOutputs)); } - for (int i = 0; i < batchSizesOfPartitioned.length; i++) { - StreamCollector[] collectors = new StreamCollector[parallelismOfOutput[i]]; + for (int i = 0; i < batchSizesOfPartitioned.size(); i++) { + StreamCollector[] collectors = new StreamCollector[parallelismOfOutput.get(i)]; for (int j = 0; j < collectors.length; j++) { - collectors[j] = new StreamCollector(batchSizesOfPartitioned[i], - batchTimeout, channelID, serializationDelegate, outputs); + List> output = new ArrayList>(); + output.add(partitionedOutputs.get(i)); + collectors[j] = new StreamCollector(batchSizesOfPartitioned.get(i), + batchTimeout, channelID, serializationDelegate, output); } partitionedCollectors.add(collectors); } } - + // TODO copy here instead of copying inside every StreamCollector @Override public void collect(T record) { for (StreamCollector collector : notPartitionedCollectors) { collector.collect(record); } - + int partitionHash = Math.abs(record.getField(keyPostition).hashCode()); - + for (StreamCollector[] collectors : partitionedCollectors) { collectors[partitionHash % collectors.length].collect(record); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java index 11624ca00c4..fd2ca5aa79b 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java @@ -121,12 +121,11 @@ public final class StreamComponentHelper { int batchSize = taskConfiguration.getInteger("batchSize", 1); long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000); - collector = new StreamCollector(batchSize, batchTimeout, id, - outSerializationDelegate, outputs); +// collector = new StreamCollector(batchSize, batchTimeout, id, +// outSerializationDelegate, outputs); - // collector = new StreamCollector2(batchsizes_s.toArray(), - // batchsizes_f.toArray(),numOfOutputs_f , keyPosition, batchTimeout, - // id, outSerializationDelegate, outputs); + collector = new StreamCollector2(batchsizes_s, batchsizes_f, numOfOutputs_f, + keyPosition, batchTimeout, id, outSerializationDelegate, outputs_f, outputs_s); return collector; } diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java index 7602463df27..3e44c9cef9c 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java @@ -69,9 +69,9 @@ public class BatchReduceTest { @Test public void test() throws Exception { - StreamExecutionEnvironment context = new StreamExecutionEnvironment(4, 1000); + StreamExecutionEnvironment context = new StreamExecutionEnvironment(); DataStream> dataStream0 = context.addSource(new MySource()) - .batchReduce(new MyBatchReduce()).addSink(new MySink()); + .batch(4).batchReduce(new MyBatchReduce()).addSink(new MySink()); context.execute(); } diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollector2Test.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollector2Test.java index 97dd6dadfe1..3c9f0de75b0 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollector2Test.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollector2Test.java @@ -15,10 +15,15 @@ package eu.stratosphere.streaming.api; +import java.util.ArrayList; +import java.util.List; + import org.junit.Test; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple1; +import eu.stratosphere.nephele.io.RecordWriter; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; public class StreamCollector2Test { @@ -26,14 +31,23 @@ public class StreamCollector2Test { @Test public void testCollect() { - int[] batchSizesOfNotPartitioned = new int[] {}; - int[] batchSizesOfPartitioned = new int[] {2, 2}; - int[] parallelismOfOutput = new int[] {2, 1}; + List batchSizesOfNotPartitioned = new ArrayList(); + List batchSizesOfPartitioned = new ArrayList(); + batchSizesOfPartitioned.add(2); + batchSizesOfPartitioned.add(2); + List parallelismOfOutput = new ArrayList(); + parallelismOfOutput.add(2); + parallelismOfOutput.add(2); int keyPosition = 0; long batchTimeout = 1000; int channelID = 1; - collector = new StreamCollector2(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, null); + List> fOut = new ArrayList>(); + + fOut.add(null); + fOut.add(null); + + collector = new StreamCollector2(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, fOut,fOut); Tuple1 t = new Tuple1(); StreamCollector sc1 = new StreamCollector(1, batchTimeout, channelID, null); -- GitLab