diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FlatMapInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FlatMapInvokable.java new file mode 100644 index 0000000000000000000000000000000000000000..26b85c0b598919697fc314066a62431d8b7fd2b1 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FlatMapInvokable.java @@ -0,0 +1,25 @@ +package eu.stratosphere.api.datastream; + +import eu.stratosphere.api.java.functions.FlatMapFunction; +import eu.stratosphere.api.java.tuple.Tuple; +import eu.stratosphere.streaming.api.StreamCollector; +import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class FlatMapInvokable extends UserTaskInvokable { + private static final long serialVersionUID = 1L; + + private FlatMapFunction flatMapper; + public FlatMapInvokable(FlatMapFunction flatMapper2) { + this.flatMapper = flatMapper2; + } + + @Override + public void invoke(StreamRecord record, StreamCollector collector) throws Exception { + int batchSize = record.getBatchSize(); + for (int i = 0; i < batchSize; i++) { + T tuple = (T) record.getTuple(i); + flatMapper.flatMap(tuple, collector); + } + } +}