From 7904f10a8f8af772b27521793deaa4ecff7322b6 Mon Sep 17 00:00:00 2001 From: ghermann Date: Mon, 14 Jul 2014 16:29:15 +0200 Subject: [PATCH] [streaming] Added FlatMapInvokable --- .../api/datastream/FlatMapInvokable.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FlatMapInvokable.java diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FlatMapInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FlatMapInvokable.java new file mode 100644 index 00000000000..26b85c0b598 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/api/datastream/FlatMapInvokable.java @@ -0,0 +1,25 @@ +package eu.stratosphere.api.datastream; + +import eu.stratosphere.api.java.functions.FlatMapFunction; +import eu.stratosphere.api.java.tuple.Tuple; +import eu.stratosphere.streaming.api.StreamCollector; +import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; + +public class FlatMapInvokable extends UserTaskInvokable { + private static final long serialVersionUID = 1L; + + private FlatMapFunction flatMapper; + public FlatMapInvokable(FlatMapFunction flatMapper2) { + this.flatMapper = flatMapper2; + } + + @Override + public void invoke(StreamRecord record, StreamCollector collector) throws Exception { + int batchSize = record.getBatchSize(); + for (int i = 0; i < batchSize; i++) { + T tuple = (T) record.getTuple(i); + flatMapper.flatMap(tuple, collector); + } + } +} -- GitLab