diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java index 8221d39702b977ead8c579b7e31a12e6dc7e6909..731b7969d967035e9f70a3e3f4530b407011a706 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java @@ -74,6 +74,11 @@ public class DataStream { } public DataStream batch(int batchSize) { + + if (batchSize < 1) { + throw new IllegalArgumentException("Batch size must be positive."); + } + for (int i = 0; i < batchSizes.size(); i++) { batchSizes.set(i, batchSize); } @@ -85,6 +90,7 @@ public class DataStream { connectIDs.addAll(stream.connectIDs); ctypes.addAll(stream.ctypes); cparams.addAll(stream.cparams); + batchSizes.addAll(stream.batchSizes); return this; } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java index de9fdde98ed5f5c487ef38223ddea5e820fab898..692efc2b72dc4764806463fdf72ffaca52cfce8c 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java @@ -64,6 +64,8 @@ public class JobGraphBuilder { protected String maxParallelismVertexName; protected int maxParallelism; protected FaultToleranceType faultToleranceType; + private int batchSize; + private long batchTimeout; /** * Creates a new JobGraph with the given name @@ -97,8 +99,11 @@ public class JobGraphBuilder { this(jobGraphName, FaultToleranceType.NONE); } - public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType, int batchSize) { + public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType, + int defaultBatchSize, long defaultBatchTimeoutMillis) { this(jobGraphName, faultToleranceType); + this.batchSize = defaultBatchSize; + this.batchTimeout = defaultBatchTimeoutMillis; } /** @@ -244,7 +249,6 @@ public class JobGraphBuilder { * @param component * AbstractJobVertex associated with the component */ - private Configuration setComponent(String componentName, final Class InvokableClass, int parallelism, int subtasksPerInstance, AbstractJobVertex component) { @@ -259,6 +263,8 @@ public class JobGraphBuilder { Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration(); config.setClass("userfunction", InvokableClass); config.setString("componentName", componentName); + config.setInteger("batchSize", batchSize); + config.setLong("batchTimeout", batchTimeout); // config.setBytes("operator", getSerializedFunction()); config.setInteger("faultToleranceType", faultToleranceType.id); @@ -268,12 +274,6 @@ public class JobGraphBuilder { return config; } - public void setBatchSize(String componentName, int batchSize) { - AbstractJobVertex component = components.get(componentName); - Configuration config = component.getConfiguration(); - config.setInteger("batchSize", batchSize); - } - private Configuration setComponent(String componentName, UserSourceInvokable InvokableObject, int parallelism, int subtasksPerInstance, AbstractJobVertex component) { @@ -304,6 +304,11 @@ public class JobGraphBuilder { return config; } + public void setBatchSize(String componentName, int batchSize) { + Configuration config = components.get(componentName).getConfiguration(); + config.setInteger("batchSize", batchSize); + } + /** * Adds serialized invokable object to the JobVertex configuration * diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java index 48be5d40e6594eae9c813b55630bde8640aaace5..03a24e7e9cc64a09a201fba88079ec5aafda867f 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java @@ -28,37 +28,54 @@ public class StreamCollector implements Collector { protected StreamRecord streamRecord; protected int batchSize; + protected long batchTimeout; protected int counter = 0; protected int channelID; + private long timeOfLastRecordEmitted = System.currentTimeMillis();; private List> outputs; - public StreamCollector(int batchSize, int channelID, + public StreamCollector(int batchSize, long batchTimeout, int channelID, SerializationDelegate serializationDelegate, List> outputs) { this.batchSize = batchSize; + this.batchTimeout = batchTimeout; this.streamRecord = new ArrayStreamRecord(batchSize); this.streamRecord.setSeralizationDelegate(serializationDelegate); this.channelID = channelID; this.outputs = outputs; } - public StreamCollector(int batchSize, int channelID, + public StreamCollector(int batchSize, long batchTimeout, int channelID, SerializationDelegate serializationDelegate) { - this(batchSize, channelID, serializationDelegate, null); + this(batchSize, batchTimeout, channelID, serializationDelegate, null); } + // TODO reconsider emitting mechanism at timeout (find a place to timeout) @Override public void collect(T tuple) { streamRecord.setTuple(counter, StreamRecord.copyTuple(tuple)); counter++; + if (counter >= batchSize) { - counter = 0; - streamRecord.setId(channelID); emit(streamRecord); + // timeOfLastRecordEmitted = System.currentTimeMillis(); + } else { + // timeout(); + } + } + + public void timeout() { + if (timeOfLastRecordEmitted + batchTimeout < System.currentTimeMillis()) { + StreamRecord truncatedRecord = new ArrayStreamRecord(streamRecord, counter); + emit(truncatedRecord); + timeOfLastRecordEmitted = System.currentTimeMillis(); } } private void emit(StreamRecord streamRecord) { + counter = 0; + streamRecord.setId(channelID); + if (outputs == null) { System.out.println(streamRecord); } else { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java index 90d4c62764ba1c65bb082139a929108bbdd7cdfc..9b64232e6ce3742282e8f9ad21f1530468920431 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java @@ -33,9 +33,19 @@ import eu.stratosphere.util.Collector; public class StreamExecutionEnvironment { JobGraphBuilder jobGraphBuilder; - public StreamExecutionEnvironment() { - jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE); + public StreamExecutionEnvironment(int defaultBatchSize, long defaultBatchTimeoutMillis) { + if (defaultBatchSize < 1) { + throw new IllegalArgumentException("Batch size must be positive."); + } + if (defaultBatchTimeoutMillis < 1) { + throw new IllegalArgumentException("Batch timeout must be positive."); + } + jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE, + defaultBatchSize, defaultBatchTimeoutMillis); + } + public StreamExecutionEnvironment() { + this(1, 1000); } private static class DummySource extends UserSourceInvokable> { @@ -54,6 +64,7 @@ public class StreamExecutionEnvironment { } public void setBatchSize(DataStream inputStream) { + for (int i = 0; i < inputStream.connectIDs.size(); i++) { jobGraphBuilder.setBatchSize(inputStream.connectIDs.get(i), inputStream.batchSizes.get(i)); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java index 702a7d1041b0119db26c2114f5c9df3fdf556da0..e413b50bf4784b689ddc4cf9d93348053ab21571 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java @@ -107,8 +107,11 @@ public final class StreamComponentHelper { public StreamCollector setCollector(Configuration taskConfiguration, int id, List> outputs) { + int batchSize = taskConfiguration.getInteger("batchSize", 1); - collector = new StreamCollector(batchSize, id, outSerializationDelegate, outputs); + long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000); + collector = new StreamCollector(batchSize, batchTimeout, id, + outSerializationDelegate, outputs); return collector; } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java index e8c53bb80146ddcb1d219f387f1742df8b63d3f5..3750a4b101090f42889303974f36bf36d833294a 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java @@ -45,14 +45,19 @@ public class ArrayStreamRecord extends StreamRecord { } public ArrayStreamRecord(StreamRecord record) { - tupleBatch = new Tuple[record.getBatchSize()]; + this(record, record.getBatchSize()); + } + + public ArrayStreamRecord(StreamRecord record, int truncatedSize) { + tupleBatch = new Tuple[truncatedSize]; this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20)); - for (int i = 0; i < record.getBatchSize(); ++i) { + for (int i = 0; i < truncatedSize; ++i) { this.tupleBatch[i] = copyTuple(record.getTuple(i)); } this.batchSize = tupleBatch.length; } + /** * Creates a new batch of records containing the given Tuple array as * elements diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java index 0dd7bcff2f0ff7893314c71a1cac6e5b859171ee..7602463df27f7ab11989702607ebfd0ffedf3452 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java @@ -15,13 +15,10 @@ package eu.stratosphere.streaming.api; -import static org.junit.Assert.fail; - import java.util.Iterator; import org.junit.Test; -import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.GroupReduceFunction; import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.util.Collector; @@ -72,8 +69,8 @@ public class BatchReduceTest { @Test public void test() throws Exception { - StreamExecutionEnvironment context = new StreamExecutionEnvironment(); - DataStream> dataStream0 = context.addSource(new MySource()).batch(4) + StreamExecutionEnvironment context = new StreamExecutionEnvironment(4, 1000); + DataStream> dataStream0 = context.addSource(new MySource()) .batchReduce(new MyBatchReduce()).addSink(new MySink()); context.execute(); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchTest.java new file mode 100644 index 0000000000000000000000000000000000000000..f69e51c5681029415a6cac2d5f4654d4b9c0c984 --- /dev/null +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchTest.java @@ -0,0 +1,60 @@ +package eu.stratosphere.streaming.api; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +import eu.stratosphere.api.java.functions.FlatMapFunction; +import eu.stratosphere.api.java.tuple.Tuple1; +import eu.stratosphere.util.Collector; + +public class BatchTest { + + private static int count = 0; + + private static final class MySource extends SourceFunction> { + + private Tuple1 outTuple = new Tuple1(); + + @Override + public void invoke(Collector> collector) throws Exception { + for (int i = 0; i < 20; i++) { + outTuple.f0 = "string #" + i; + collector.collect(outTuple); + } + } + } + + private static final class MyMap extends FlatMapFunction, Tuple1> { + + @Override + public void flatMap(Tuple1 value, Collector> out) throws Exception { + out.collect(value); + } + } + + private static final class MySink extends SinkFunction> { + + @Override + public void invoke(Tuple1 tuple) { + count++; + } + } + + @Test + public void test() throws Exception { + StreamExecutionEnvironment context = new StreamExecutionEnvironment(); + + DataStream> dataStream = context + .addSource(new MySource()) + .flatMap(new MyMap()).batch(4) + .flatMap(new MyMap()).batch(2) + .flatMap(new MyMap()).batch(5) + .flatMap(new MyMap()).batch(4) + .addSink(new MySink()); + + context.execute(); + + assertEquals(20, count); + } +} diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java index 7040d44def71c4ddc024f4bb4ec95d41ad6d8046..eaf18c77b0eb10c36924411f0ca948baf290ab64 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java @@ -70,7 +70,18 @@ public class FlatMapTest { @Test public void test() throws Exception { - StreamExecutionEnvironment context = new StreamExecutionEnvironment(); + try { + StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(0, 1000); + fail(); + } catch (IllegalArgumentException e) { + try { + StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(1, 0); + fail(); + } catch (IllegalArgumentException e2) { + } + } + + StreamExecutionEnvironment context = new StreamExecutionEnvironment(2, 1000); DataStream> dataStream0 = context.addSource(new MySource()); DataStream> dataStream1 = context.addDummySource().connectWith(dataStream0) @@ -90,7 +101,7 @@ public class FlatMapTest { FlatMapFunction f = (FlatMapFunction) in.readObject(); - StreamCollector s = new StreamCollector(1, 1, null); + StreamCollector s = new StreamCollector(1, 1000, 1, null); Tuple t = new Tuple1("asd"); f.flatMap(t, s); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java index fa433598c5d8af54b6a759739f8edfdc54c3e702..bee2e446d4f86aecd99c032d686977aded3e7116 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java @@ -72,7 +72,7 @@ public class MapTest { MapFunction f = (MapFunction) in.readObject(); - StreamCollector s = new StreamCollector(1, 1, null); + StreamCollector s = new StreamCollector(1, 1000, 1, null); Tuple t = new Tuple1("asd"); s.collect(f.map(t)); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java index a965d1641e2cc851c0300c0e23c4d06db5bded97..c3e9ccb045738fc25250e7de50f88bf86471f7d1 100755 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java @@ -25,13 +25,13 @@ public class StreamCollectorTest { @Test public void testStreamCollector() { - StreamCollector collector = new StreamCollector(10, 0, null); + StreamCollector collector = new StreamCollector(10, 1000, 0, null); assertEquals(10, collector.batchSize); } @Test public void testCollect() { - StreamCollector collector = new StreamCollector(2, 0, null); + StreamCollector collector = new StreamCollector(2, 1000, 0, null); collector.collect(new Tuple1(3)); collector.collect(new Tuple1(4)); collector.collect(new Tuple1(5)); @@ -39,6 +39,20 @@ public class StreamCollectorTest { } + @Test + public void testBatchSize() throws InterruptedException { + System.out.println("---------------"); + StreamCollector collector = new StreamCollector(3, 100, 0, null); + collector.collect(new Tuple1(0)); + collector.collect(new Tuple1(0)); + collector.collect(new Tuple1(0)); + + Thread.sleep(200); + collector.collect(new Tuple1(2)); + collector.collect(new Tuple1(3)); + System.out.println("---------------"); + } + @Test public void testClose() { } diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecordTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecordTest.java index 6633cfd9d2ed9a7bca038cb81e23b52d2b2a2419..6edd5682d226033a65433b9fdabbb3c23c16e63c 100755 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecordTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecordTest.java @@ -97,4 +97,17 @@ public class ArrayStreamRecordTest { } + @Test + public void truncatedSizeTest() { + StreamRecord record = new ArrayStreamRecord(4); + record.setTuple(0, new Tuple1(0)); + record.setTuple(1, new Tuple1(1)); + record.setTuple(2, new Tuple1(2)); + record.setTuple(3, new Tuple1(3)); + + StreamRecord truncatedRecord = new ArrayStreamRecord(record, 2); + assertEquals(2, truncatedRecord.batchSize); + assertEquals(0, truncatedRecord.getTuple(0).getField(0)); + assertEquals(1, truncatedRecord.getTuple(1).getField(0)); + } }