diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java index eeacd90c1bf349367318b1a1d2adcff659e28749..4c414f8a5d026346ef2b046caa739a4e8bcc38ea 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java @@ -94,7 +94,7 @@ public class DataStream { public DataStream batch(int batchSize) { return context.setBatchSize(this, batchSize); } - + public DataStream flatMap(FlatMapFunction flatMapper) { return context.addFunction("flatMap", this, flatMapper, new FlatMapInvokable( flatMapper)); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java index 601e5c3b682101735ff73456716b48324e7ba39e..beb22a8b43ee3ea1da33d37c5b929e2eee49485e 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java @@ -65,6 +65,7 @@ public class JobGraphBuilder { protected int maxParallelism; protected FaultToleranceType faultToleranceType; private int batchSize; + private long batchTimeout; /** * Creates a new JobGraph with the given name @@ -99,9 +100,10 @@ public class JobGraphBuilder { this(jobGraphName, FaultToleranceType.NONE); } - public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType, int batchSize) { + public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType, int defaultBatchSize, long defaultBatchTimeoutMillis) { this(jobGraphName, faultToleranceType); - this.batchSize = batchSize; + this.batchSize = defaultBatchSize; + this.batchTimeout = defaultBatchTimeoutMillis; } /** @@ -262,6 +264,7 @@ public class JobGraphBuilder { config.setClass("userfunction", InvokableClass); config.setString("componentName", componentName); config.setInteger("batchSize", batchSize); + config.setLong("batchTimeout", batchTimeout); // config.setBytes("operator", getSerializedFunction()); config.setInteger("faultToleranceType", faultToleranceType.id); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java index 48be5d40e6594eae9c813b55630bde8640aaace5..57b47735434026d53af929d7ae37309d50e77a70 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java @@ -28,37 +28,54 @@ public class StreamCollector implements Collector { protected StreamRecord streamRecord; protected int batchSize; + protected long batchTimeout; protected int counter = 0; protected int channelID; + private long timeOfLastRecordEmitted = System.currentTimeMillis();; private List> outputs; - public StreamCollector(int batchSize, int channelID, + public StreamCollector(int batchSize, long batchTimeout, int channelID, SerializationDelegate serializationDelegate, List> outputs) { this.batchSize = batchSize; + this.batchTimeout = batchTimeout; this.streamRecord = new ArrayStreamRecord(batchSize); this.streamRecord.setSeralizationDelegate(serializationDelegate); this.channelID = channelID; this.outputs = outputs; } - public StreamCollector(int batchSize, int channelID, + public StreamCollector(int batchSize, long batchTimeout, int channelID, SerializationDelegate serializationDelegate) { - this(batchSize, channelID, serializationDelegate, null); + this(batchSize, batchTimeout, channelID, serializationDelegate, null); } + // TODO reconsider emitting mechanism at timeout (find a place to timeout) @Override public void collect(T tuple) { streamRecord.setTuple(counter, StreamRecord.copyTuple(tuple)); counter++; + if (counter >= batchSize) { - counter = 0; - streamRecord.setId(channelID); emit(streamRecord); + timeOfLastRecordEmitted = System.currentTimeMillis(); + } else { + timeout(); } } + public void timeout() { + if (timeOfLastRecordEmitted + batchTimeout < System.currentTimeMillis()) { + StreamRecord truncatedRecord = new ArrayStreamRecord(streamRecord, counter); + emit(truncatedRecord); + timeOfLastRecordEmitted = System.currentTimeMillis(); + } + } + private void emit(StreamRecord streamRecord) { + counter = 0; + streamRecord.setId(channelID); + if (outputs == null) { System.out.println(streamRecord); } else { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java index 26587704c2c72d548dfae7992f184459d0d95c75..6a8356fbe24d69077776f6d554db39b9ead74c81 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java @@ -33,15 +33,18 @@ import eu.stratosphere.util.Collector; public class StreamExecutionEnvironment { JobGraphBuilder jobGraphBuilder; - public StreamExecutionEnvironment(int defaultBatchSize) { + public StreamExecutionEnvironment(int defaultBatchSize, long defaultBatchTimeoutMillis) { if (defaultBatchSize < 1) { throw new IllegalArgumentException("Batch size must be positive."); } - jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE, defaultBatchSize); + if (defaultBatchTimeoutMillis < 1) { + throw new IllegalArgumentException("Batch timeout must be positive."); + } + jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE, defaultBatchSize, defaultBatchTimeoutMillis); } public StreamExecutionEnvironment() { - this(1); + this(1, 1000); } private static class DummySource extends UserSourceInvokable> { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java index dee5a8b5d3e88f6205562c41bd6101a96a356f9d..c1accd392373fe78caa5e2eca29ae2ef0a386976 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java @@ -108,7 +108,9 @@ public final class StreamComponentHelper { public StreamCollector setCollector(Configuration taskConfiguration, int id, List> outputs) { int batchSize = taskConfiguration.getInteger("batchSize", -1); - collector = new StreamCollector(batchSize, id, outSerializationDelegate, outputs); + long batchTimeout = taskConfiguration.getLong("batchTimeout", -1); + collector = new StreamCollector(batchSize, batchTimeout, id, + outSerializationDelegate, outputs); return collector; } @@ -121,7 +123,7 @@ public final class StreamComponentHelper { try { ObjectInputStream in = new ObjectInputStream(new ByteArrayInputStream(operatorBytes)); Object function = in.readObject(); - + if (operatorName.equals("flatMap")) { setSerializer(function, FlatMapFunction.class); } else if (operatorName.equals("map")) { @@ -151,21 +153,21 @@ public final class StreamComponentHelper { } } - + private void setSerializer(Object function, Class clazz) { - inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, - function.getClass(), 0, null, null); + inTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(), + 0, null, null); inTupleSerializer = inTupleTypeInfo.createSerializer(); inDeserializationDelegate = new DeserializationDelegate(inTupleSerializer); - outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, - function.getClass(), 1, null, null); + outTupleTypeInfo = (TupleTypeInfo) TypeExtractor.createTypeInfo(clazz, function.getClass(), + 1, null, null); outTupleSerializer = outTupleTypeInfo.createSerializer(); outSerializationDelegate = new SerializationDelegate(outTupleSerializer); } - + public AbstractRecordReader getConfigInputs(T taskBase, Configuration taskConfiguration) throws StreamComponentException { int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java index e8c53bb80146ddcb1d219f387f1742df8b63d3f5..3750a4b101090f42889303974f36bf36d833294a 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java @@ -45,14 +45,19 @@ public class ArrayStreamRecord extends StreamRecord { } public ArrayStreamRecord(StreamRecord record) { - tupleBatch = new Tuple[record.getBatchSize()]; + this(record, record.getBatchSize()); + } + + public ArrayStreamRecord(StreamRecord record, int truncatedSize) { + tupleBatch = new Tuple[truncatedSize]; this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20)); - for (int i = 0; i < record.getBatchSize(); ++i) { + for (int i = 0; i < truncatedSize; ++i) { this.tupleBatch[i] = copyTuple(record.getTuple(i)); } this.batchSize = tupleBatch.length; } + /** * Creates a new batch of records containing the given Tuple array as * elements diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java index 958e0ee263c808335effd39c601eb5a41d23d083..c6e5f17738cef3695bb73685df7e9a320ef2b9a7 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java @@ -72,7 +72,7 @@ public class BatchReduceTest { @Test public void test() throws Exception { - StreamExecutionEnvironment context = new StreamExecutionEnvironment(4); + StreamExecutionEnvironment context = new StreamExecutionEnvironment(4, 1000); DataStream> dataStream0 = context.addSource(new MySource()).batchReduce(new MyBatchReduce()).addSink(new MySink()); context.execute(); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchTest.java index 7550c08650b331d07204603084e0660b25605e3d..f69e51c5681029415a6cac2d5f4654d4b9c0c984 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchTest.java @@ -47,10 +47,10 @@ public class BatchTest { DataStream> dataStream = context .addSource(new MySource()) - .flatMap(new MyMap()).batch(2) .flatMap(new MyMap()).batch(4) .flatMap(new MyMap()).batch(2) .flatMap(new MyMap()).batch(5) + .flatMap(new MyMap()).batch(4) .addSink(new MySink()); context.execute(); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java index a5cc3cb521502dd9012f3d73a29d111d5eedf027..be785309456adfb7436569e3de76b5df08e4cc2e 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java @@ -71,12 +71,17 @@ public class FlatMapTest { public void test() throws Exception { try { - StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(0); + StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(0, 1000); fail(); } catch (IllegalArgumentException e) { + try { + StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(1, 0); + fail(); + } catch (IllegalArgumentException e2) { + } } - StreamExecutionEnvironment context = new StreamExecutionEnvironment(2); + StreamExecutionEnvironment context = new StreamExecutionEnvironment(2, 1000); DataStream> dataStream0 = context.addSource(new MySource()); DataStream> dataStream1 = context.addDummySource().connectWith(dataStream0) @@ -96,7 +101,7 @@ public class FlatMapTest { FlatMapFunction f = (FlatMapFunction) in.readObject(); - StreamCollector s = new StreamCollector(1, 1, null); + StreamCollector s = new StreamCollector<>(1, 1000, 1, null); Tuple t = new Tuple1("asd"); f.flatMap(t, s); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java index fa433598c5d8af54b6a759739f8edfdc54c3e702..bee2e446d4f86aecd99c032d686977aded3e7116 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java @@ -72,7 +72,7 @@ public class MapTest { MapFunction f = (MapFunction) in.readObject(); - StreamCollector s = new StreamCollector(1, 1, null); + StreamCollector s = new StreamCollector(1, 1000, 1, null); Tuple t = new Tuple1("asd"); s.collect(f.map(t)); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java index a965d1641e2cc851c0300c0e23c4d06db5bded97..c3e9ccb045738fc25250e7de50f88bf86471f7d1 100755 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java @@ -25,13 +25,13 @@ public class StreamCollectorTest { @Test public void testStreamCollector() { - StreamCollector collector = new StreamCollector(10, 0, null); + StreamCollector collector = new StreamCollector(10, 1000, 0, null); assertEquals(10, collector.batchSize); } @Test public void testCollect() { - StreamCollector collector = new StreamCollector(2, 0, null); + StreamCollector collector = new StreamCollector(2, 1000, 0, null); collector.collect(new Tuple1(3)); collector.collect(new Tuple1(4)); collector.collect(new Tuple1(5)); @@ -39,6 +39,20 @@ public class StreamCollectorTest { } + @Test + public void testBatchSize() throws InterruptedException { + System.out.println("---------------"); + StreamCollector collector = new StreamCollector(3, 100, 0, null); + collector.collect(new Tuple1(0)); + collector.collect(new Tuple1(0)); + collector.collect(new Tuple1(0)); + + Thread.sleep(200); + collector.collect(new Tuple1(2)); + collector.collect(new Tuple1(3)); + System.out.println("---------------"); + } + @Test public void testClose() { } diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecordTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecordTest.java index 6633cfd9d2ed9a7bca038cb81e23b52d2b2a2419..6edd5682d226033a65433b9fdabbb3c23c16e63c 100755 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecordTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecordTest.java @@ -97,4 +97,17 @@ public class ArrayStreamRecordTest { } + @Test + public void truncatedSizeTest() { + StreamRecord record = new ArrayStreamRecord(4); + record.setTuple(0, new Tuple1(0)); + record.setTuple(1, new Tuple1(1)); + record.setTuple(2, new Tuple1(2)); + record.setTuple(3, new Tuple1(3)); + + StreamRecord truncatedRecord = new ArrayStreamRecord(record, 2); + assertEquals(2, truncatedRecord.batchSize); + assertEquals(0, truncatedRecord.getTuple(0).getField(0)); + assertEquals(1, truncatedRecord.getTuple(1).getField(0)); + } }