From 108eba3ac94e3199583e0472c11c74022d38e250 Mon Sep 17 00:00:00 2001 From: gyfora Date: Mon, 14 Jul 2014 16:29:18 +0200 Subject: [PATCH] [streaming] connectWith fix --- .../streaming/api/DataStream.java | 6 ++ .../streaming/api/JobGraphBuilder.java | 21 ++++--- .../streaming/api/StreamCollector.java | 27 +++++++-- .../api/StreamExecutionEnvironment.java | 15 ++++- .../StreamComponentHelper.java | 5 +- .../api/streamrecord/ArrayStreamRecord.java | 9 ++- .../streaming/api/BatchReduceTest.java | 7 +-- .../stratosphere/streaming/api/BatchTest.java | 60 +++++++++++++++++++ .../streaming/api/FlatMapTest.java | 15 ++++- .../stratosphere/streaming/api/MapTest.java | 2 +- .../streaming/api/StreamCollectorTest.java | 18 +++++- .../streamrecord/ArrayStreamRecordTest.java | 13 ++++ 12 files changed, 170 insertions(+), 28 deletions(-) create mode 100644 flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchTest.java diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java index 8221d39702b..731b7969d96 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/DataStream.java @@ -74,6 +74,11 @@ public class DataStream { } public DataStream batch(int batchSize) { + + if (batchSize < 1) { + throw new IllegalArgumentException("Batch size must be positive."); + } + for (int i = 0; i < batchSizes.size(); i++) { batchSizes.set(i, batchSize); } @@ -85,6 +90,7 @@ public class DataStream { connectIDs.addAll(stream.connectIDs); ctypes.addAll(stream.ctypes); cparams.addAll(stream.cparams); + batchSizes.addAll(stream.batchSizes); return this; } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java index de9fdde98ed..692efc2b72d 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java @@ -64,6 +64,8 @@ public class JobGraphBuilder { protected String maxParallelismVertexName; protected int maxParallelism; protected FaultToleranceType faultToleranceType; + private int batchSize; + private long batchTimeout; /** * Creates a new JobGraph with the given name @@ -97,8 +99,11 @@ public class JobGraphBuilder { this(jobGraphName, FaultToleranceType.NONE); } - public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType, int batchSize) { + public JobGraphBuilder(String jobGraphName, FaultToleranceType faultToleranceType, + int defaultBatchSize, long defaultBatchTimeoutMillis) { this(jobGraphName, faultToleranceType); + this.batchSize = defaultBatchSize; + this.batchTimeout = defaultBatchTimeoutMillis; } /** @@ -244,7 +249,6 @@ public class JobGraphBuilder { * @param component * AbstractJobVertex associated with the component */ - private Configuration setComponent(String componentName, final Class InvokableClass, int parallelism, int subtasksPerInstance, AbstractJobVertex component) { @@ -259,6 +263,8 @@ public class JobGraphBuilder { Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration(); config.setClass("userfunction", InvokableClass); config.setString("componentName", componentName); + config.setInteger("batchSize", batchSize); + config.setLong("batchTimeout", batchTimeout); // config.setBytes("operator", getSerializedFunction()); config.setInteger("faultToleranceType", faultToleranceType.id); @@ -268,12 +274,6 @@ public class JobGraphBuilder { return config; } - public void setBatchSize(String componentName, int batchSize) { - AbstractJobVertex component = components.get(componentName); - Configuration config = component.getConfiguration(); - config.setInteger("batchSize", batchSize); - } - private Configuration setComponent(String componentName, UserSourceInvokable InvokableObject, int parallelism, int subtasksPerInstance, AbstractJobVertex component) { @@ -304,6 +304,11 @@ public class JobGraphBuilder { return config; } + public void setBatchSize(String componentName, int batchSize) { + Configuration config = components.get(componentName).getConfiguration(); + config.setInteger("batchSize", batchSize); + } + /** * Adds serialized invokable object to the JobVertex configuration * diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java index 48be5d40e65..03a24e7e9cc 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamCollector.java @@ -28,37 +28,54 @@ public class StreamCollector implements Collector { protected StreamRecord streamRecord; protected int batchSize; + protected long batchTimeout; protected int counter = 0; protected int channelID; + private long timeOfLastRecordEmitted = System.currentTimeMillis();; private List> outputs; - public StreamCollector(int batchSize, int channelID, + public StreamCollector(int batchSize, long batchTimeout, int channelID, SerializationDelegate serializationDelegate, List> outputs) { this.batchSize = batchSize; + this.batchTimeout = batchTimeout; this.streamRecord = new ArrayStreamRecord(batchSize); this.streamRecord.setSeralizationDelegate(serializationDelegate); this.channelID = channelID; this.outputs = outputs; } - public StreamCollector(int batchSize, int channelID, + public StreamCollector(int batchSize, long batchTimeout, int channelID, SerializationDelegate serializationDelegate) { - this(batchSize, channelID, serializationDelegate, null); + this(batchSize, batchTimeout, channelID, serializationDelegate, null); } + // TODO reconsider emitting mechanism at timeout (find a place to timeout) @Override public void collect(T tuple) { streamRecord.setTuple(counter, StreamRecord.copyTuple(tuple)); counter++; + if (counter >= batchSize) { - counter = 0; - streamRecord.setId(channelID); emit(streamRecord); + // timeOfLastRecordEmitted = System.currentTimeMillis(); + } else { + // timeout(); + } + } + + public void timeout() { + if (timeOfLastRecordEmitted + batchTimeout < System.currentTimeMillis()) { + StreamRecord truncatedRecord = new ArrayStreamRecord(streamRecord, counter); + emit(truncatedRecord); + timeOfLastRecordEmitted = System.currentTimeMillis(); } } private void emit(StreamRecord streamRecord) { + counter = 0; + streamRecord.setId(channelID); + if (outputs == null) { System.out.println(streamRecord); } else { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java index 90d4c62764b..9b64232e6ce 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamExecutionEnvironment.java @@ -33,9 +33,19 @@ import eu.stratosphere.util.Collector; public class StreamExecutionEnvironment { JobGraphBuilder jobGraphBuilder; - public StreamExecutionEnvironment() { - jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE); + public StreamExecutionEnvironment(int defaultBatchSize, long defaultBatchTimeoutMillis) { + if (defaultBatchSize < 1) { + throw new IllegalArgumentException("Batch size must be positive."); + } + if (defaultBatchTimeoutMillis < 1) { + throw new IllegalArgumentException("Batch timeout must be positive."); + } + jobGraphBuilder = new JobGraphBuilder("jobGraph", FaultToleranceType.NONE, + defaultBatchSize, defaultBatchTimeoutMillis); + } + public StreamExecutionEnvironment() { + this(1, 1000); } private static class DummySource extends UserSourceInvokable> { @@ -54,6 +64,7 @@ public class StreamExecutionEnvironment { } public void setBatchSize(DataStream inputStream) { + for (int i = 0; i < inputStream.connectIDs.size(); i++) { jobGraphBuilder.setBatchSize(inputStream.connectIDs.get(i), inputStream.batchSizes.get(i)); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java index 702a7d1041b..e413b50bf47 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamComponentHelper.java @@ -107,8 +107,11 @@ public final class StreamComponentHelper { public StreamCollector setCollector(Configuration taskConfiguration, int id, List> outputs) { + int batchSize = taskConfiguration.getInteger("batchSize", 1); - collector = new StreamCollector(batchSize, id, outSerializationDelegate, outputs); + long batchTimeout = taskConfiguration.getLong("batchTimeout", 1000); + collector = new StreamCollector(batchSize, batchTimeout, id, + outSerializationDelegate, outputs); return collector; } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java index e8c53bb8014..3750a4b1010 100755 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecord.java @@ -45,14 +45,19 @@ public class ArrayStreamRecord extends StreamRecord { } public ArrayStreamRecord(StreamRecord record) { - tupleBatch = new Tuple[record.getBatchSize()]; + this(record, record.getBatchSize()); + } + + public ArrayStreamRecord(StreamRecord record, int truncatedSize) { + tupleBatch = new Tuple[truncatedSize]; this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20)); - for (int i = 0; i < record.getBatchSize(); ++i) { + for (int i = 0; i < truncatedSize; ++i) { this.tupleBatch[i] = copyTuple(record.getTuple(i)); } this.batchSize = tupleBatch.length; } + /** * Creates a new batch of records containing the given Tuple array as * elements diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java index 0dd7bcff2f0..7602463df27 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchReduceTest.java @@ -15,13 +15,10 @@ package eu.stratosphere.streaming.api; -import static org.junit.Assert.fail; - import java.util.Iterator; import org.junit.Test; -import eu.stratosphere.api.java.functions.FlatMapFunction; import eu.stratosphere.api.java.functions.GroupReduceFunction; import eu.stratosphere.api.java.tuple.Tuple1; import eu.stratosphere.util.Collector; @@ -72,8 +69,8 @@ public class BatchReduceTest { @Test public void test() throws Exception { - StreamExecutionEnvironment context = new StreamExecutionEnvironment(); - DataStream> dataStream0 = context.addSource(new MySource()).batch(4) + StreamExecutionEnvironment context = new StreamExecutionEnvironment(4, 1000); + DataStream> dataStream0 = context.addSource(new MySource()) .batchReduce(new MyBatchReduce()).addSink(new MySink()); context.execute(); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchTest.java new file mode 100644 index 00000000000..f69e51c5681 --- /dev/null +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/BatchTest.java @@ -0,0 +1,60 @@ +package eu.stratosphere.streaming.api; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +import eu.stratosphere.api.java.functions.FlatMapFunction; +import eu.stratosphere.api.java.tuple.Tuple1; +import eu.stratosphere.util.Collector; + +public class BatchTest { + + private static int count = 0; + + private static final class MySource extends SourceFunction> { + + private Tuple1 outTuple = new Tuple1(); + + @Override + public void invoke(Collector> collector) throws Exception { + for (int i = 0; i < 20; i++) { + outTuple.f0 = "string #" + i; + collector.collect(outTuple); + } + } + } + + private static final class MyMap extends FlatMapFunction, Tuple1> { + + @Override + public void flatMap(Tuple1 value, Collector> out) throws Exception { + out.collect(value); + } + } + + private static final class MySink extends SinkFunction> { + + @Override + public void invoke(Tuple1 tuple) { + count++; + } + } + + @Test + public void test() throws Exception { + StreamExecutionEnvironment context = new StreamExecutionEnvironment(); + + DataStream> dataStream = context + .addSource(new MySource()) + .flatMap(new MyMap()).batch(4) + .flatMap(new MyMap()).batch(2) + .flatMap(new MyMap()).batch(5) + .flatMap(new MyMap()).batch(4) + .addSink(new MySink()); + + context.execute(); + + assertEquals(20, count); + } +} diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java index 7040d44def7..eaf18c77b0e 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FlatMapTest.java @@ -70,7 +70,18 @@ public class FlatMapTest { @Test public void test() throws Exception { - StreamExecutionEnvironment context = new StreamExecutionEnvironment(); + try { + StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(0, 1000); + fail(); + } catch (IllegalArgumentException e) { + try { + StreamExecutionEnvironment context2 = new StreamExecutionEnvironment(1, 0); + fail(); + } catch (IllegalArgumentException e2) { + } + } + + StreamExecutionEnvironment context = new StreamExecutionEnvironment(2, 1000); DataStream> dataStream0 = context.addSource(new MySource()); DataStream> dataStream1 = context.addDummySource().connectWith(dataStream0) @@ -90,7 +101,7 @@ public class FlatMapTest { FlatMapFunction f = (FlatMapFunction) in.readObject(); - StreamCollector s = new StreamCollector(1, 1, null); + StreamCollector s = new StreamCollector(1, 1000, 1, null); Tuple t = new Tuple1("asd"); f.flatMap(t, s); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java index fa433598c5d..bee2e446d4f 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/MapTest.java @@ -72,7 +72,7 @@ public class MapTest { MapFunction f = (MapFunction) in.readObject(); - StreamCollector s = new StreamCollector(1, 1, null); + StreamCollector s = new StreamCollector(1, 1000, 1, null); Tuple t = new Tuple1("asd"); s.collect(f.map(t)); diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java index a965d1641e2..c3e9ccb0457 100755 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/StreamCollectorTest.java @@ -25,13 +25,13 @@ public class StreamCollectorTest { @Test public void testStreamCollector() { - StreamCollector collector = new StreamCollector(10, 0, null); + StreamCollector collector = new StreamCollector(10, 1000, 0, null); assertEquals(10, collector.batchSize); } @Test public void testCollect() { - StreamCollector collector = new StreamCollector(2, 0, null); + StreamCollector collector = new StreamCollector(2, 1000, 0, null); collector.collect(new Tuple1(3)); collector.collect(new Tuple1(4)); collector.collect(new Tuple1(5)); @@ -39,6 +39,20 @@ public class StreamCollectorTest { } + @Test + public void testBatchSize() throws InterruptedException { + System.out.println("---------------"); + StreamCollector collector = new StreamCollector(3, 100, 0, null); + collector.collect(new Tuple1(0)); + collector.collect(new Tuple1(0)); + collector.collect(new Tuple1(0)); + + Thread.sleep(200); + collector.collect(new Tuple1(2)); + collector.collect(new Tuple1(3)); + System.out.println("---------------"); + } + @Test public void testClose() { } diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecordTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecordTest.java index 6633cfd9d2e..6edd5682d22 100755 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecordTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/streamrecord/ArrayStreamRecordTest.java @@ -97,4 +97,17 @@ public class ArrayStreamRecordTest { } + @Test + public void truncatedSizeTest() { + StreamRecord record = new ArrayStreamRecord(4); + record.setTuple(0, new Tuple1(0)); + record.setTuple(1, new Tuple1(1)); + record.setTuple(2, new Tuple1(2)); + record.setTuple(3, new Tuple1(3)); + + StreamRecord truncatedRecord = new ArrayStreamRecord(record, 2); + assertEquals(2, truncatedRecord.batchSize); + assertEquals(0, truncatedRecord.getTuple(0).getField(0)); + assertEquals(1, truncatedRecord.getTuple(1).getField(0)); + } } -- GitLab