diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java index 03a24e7e9cc64a09a201fba88079ec5aafda867f..bac07322ff7c24bc879551cbff4f51ade2c91e36 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java @@ -53,6 +53,7 @@ public class StreamCollector implements Collector { // TODO reconsider emitting mechanism at timeout (find a place to timeout) @Override public void collect(T tuple) { + //TODO: move copy to StreamCollector2 streamRecord.setTuple(counter, StreamRecord.copyTuple(tuple)); counter++; diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector2.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector2.java index 66bdf7ad6df87b118803e3ae5f2d5ab4fc474d3c..ae70b5a7b41d051d93719c99effe002bf11d4b78 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector2.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector2.java @@ -28,45 +28,50 @@ public class StreamCollector2 implements Collector { ArrayList> notPartitionedCollectors; ArrayList[]> partitionedCollectors; + List> partitionedOutputs; + List> notPartitionedOutputs; int keyPostition; - + // TODO consider channelID - public StreamCollector2(int[] batchSizesOfNotPartitioned, int[] batchSizesOfPartitioned, - int[] parallelismOfOutput, int keyPosition, long batchTimeout, int channelID, + public StreamCollector2(List batchSizesOfNotPartitioned, List batchSizesOfPartitioned, + List parallelismOfOutput, int keyPosition, long batchTimeout, int channelID, SerializationDelegate serializationDelegate, - List> outputs) { + List> partitionedOutputs, + List> notPartitionedOutputs) { notPartitionedCollectors = new ArrayList>( - batchSizesOfNotPartitioned.length); + batchSizesOfNotPartitioned.size()); partitionedCollectors = new ArrayList[]>( - batchSizesOfPartitioned.length); + batchSizesOfPartitioned.size()); this.keyPostition = keyPosition; - - for (int i = 0; i < batchSizesOfNotPartitioned.length; i++) { - notPartitionedCollectors.add(new StreamCollector(batchSizesOfNotPartitioned[i], - batchTimeout, channelID, serializationDelegate, outputs)); + + for (int i = 0; i < batchSizesOfNotPartitioned.size(); i++) { + notPartitionedCollectors.add(new StreamCollector(batchSizesOfNotPartitioned.get(i), + batchTimeout, channelID, serializationDelegate, notPartitionedOutputs)); } - for (int i = 0; i < batchSizesOfPartitioned.length; i++) { - StreamCollector[] collectors = new StreamCollector[parallelismOfOutput[i]]; + for (int i = 0; i < batchSizesOfPartitioned.size(); i++) { + StreamCollector[] collectors = new StreamCollector[parallelismOfOutput.get(i)]; for (int j = 0; j < collectors.length; j++) { - collectors[j] = new StreamCollector(batchSizesOfPartitioned[i], - batchTimeout, channelID, serializationDelegate, outputs); + List> output = new ArrayList>(); + output.add(partitionedOutputs.get(i)); + collectors[j] = new StreamCollector(batchSizesOfPartitioned.get(i), + batchTimeout, channelID, serializationDelegate, output); } partitionedCollectors.add(collectors); } } - + // TODO copy here instead of copying inside every StreamCollector @Override public void collect(T record) { for (StreamCollector collector : notPartitionedCollectors) { collector.collect(record); } - + int partitionHash = Math.abs(record.getField(keyPostition).hashCode()); - + for (StreamCollector[] collectors : partitionedCollectors) { collectors[partitionHash % collectors.length].collect(record); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java index 11624ca00c4057bd2b9b73635f0d52d35735be99..fd2ca5aa79be3960d3bedf6ffaaf9247f09fa7ca 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java @@ -121,12 +121,11 @@ public final class StreamComponentHelper { int batchSize = taskConfiguration.getInteger("batchSize", 1); long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000); - collector = new StreamCollector(batchSize, batchTimeout, id, - outSerializationDelegate, outputs); +// collector = new StreamCollector(batchSize, batchTimeout, id, +// outSerializationDelegate, outputs); - // collector = new StreamCollector2(batchsizes_s.toArray(), - // batchsizes_f.toArray(),numOfOutputs_f , keyPosition, batchTimeout, - // id, outSerializationDelegate, outputs); + collector = new StreamCollector2(batchsizes_s, batchsizes_f, numOfOutputs_f, + keyPosition, batchTimeout, id, outSerializationDelegate, outputs_f, outputs_s); return collector; } diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java index 7602463df27f7ab11989702607ebfd0ffedf3452..3e44c9cef9c3bf88438338fc639dd0d7027ecacc 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java @@ -69,9 +69,9 @@ public class BatchReduceTest { @Test public void test() throws Exception { - StreamExecutionEnvironment context = new StreamExecutionEnvironment(4, 1000); + StreamExecutionEnvironment context = new StreamExecutionEnvironment(); DataStream> dataStream0 = context.addSource(new MySource()) - .batchReduce(new MyBatchReduce()).addSink(new MySink()); + .batch(4).batchReduce(new MyBatchReduce()).addSink(new MySink()); context.execute(); } diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollector2Test.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollector2Test.java index 97dd6dadfe1f4ba8c355314fcf4f467bfbd487de..3c9f0de75b0062c8421494f02d53c7f302e3a57c 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollector2Test.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollector2Test.java @@ -15,10 +15,15 @@ package eu.stratosphere.streaming.api; +import java.util.ArrayList; +import java.util.List; + import org.junit.Test; import eu.stratosphere.api.java.tuple.Tuple; import eu.stratosphere.api.java.tuple.Tuple1; +import eu.stratosphere.nephele.io.RecordWriter; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; public class StreamCollector2Test { @@ -26,14 +31,23 @@ public class StreamCollector2Test { @Test public void testCollect() { - int[] batchSizesOfNotPartitioned = new int[] {}; - int[] batchSizesOfPartitioned = new int[] {2, 2}; - int[] parallelismOfOutput = new int[] {2, 1}; + List batchSizesOfNotPartitioned = new ArrayList(); + List batchSizesOfPartitioned = new ArrayList(); + batchSizesOfPartitioned.add(2); + batchSizesOfPartitioned.add(2); + List parallelismOfOutput = new ArrayList(); + parallelismOfOutput.add(2); + parallelismOfOutput.add(2); int keyPosition = 0; long batchTimeout = 1000; int channelID = 1; - collector = new StreamCollector2(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, null); + List> fOut = new ArrayList>(); + + fOut.add(null); + fOut.add(null); + + collector = new StreamCollector2(batchSizesOfNotPartitioned, batchSizesOfPartitioned, parallelismOfOutput, keyPosition, batchTimeout, channelID, null, fOut,fOut); Tuple1 t = new Tuple1(); StreamCollector sc1 = new StreamCollector(1, batchTimeout, channelID, null);