diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamComponentFactory.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamComponentFactory.java index 3f932ec51d71d6ac3da7b18fd5e80ece48782ac8..2c3453e573dd9711c386602638601d8a99f77ecc 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamComponentFactory.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamComponentFactory.java @@ -8,6 +8,8 @@ import eu.stratosphere.nephele.event.task.EventListener; import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.RecordReader; import eu.stratosphere.nephele.io.RecordWriter; +import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable; +import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.partitioner.DefaultPartitioner; import eu.stratosphere.streaming.partitioner.FieldsPartitioner; import eu.stratosphere.types.Key; @@ -48,7 +50,7 @@ public final class StreamComponentFactory { return numberOfInputs; } - //for StreamTask + // for StreamTask public static int setConfigOutputs(StreamTask taskBase, Configuration taskConfiguration, List> outputs, @@ -65,7 +67,7 @@ public final class StreamComponentFactory { } return numberOfOutputs; } - + // this function can be removed as duplication of the above function if // modification on kernel is allowed. // for StreamSource @@ -86,6 +88,25 @@ public final class StreamComponentFactory { return numberOfOutputs; } + public static UserTaskInvokable setUserFunction( + Configuration taskConfiguration, + List> outputs, String instanceID, + Map recordBuffer) { + + Class userFunctionClass = taskConfiguration + .getClass("userfunction", DefaultTaskInvokable.class, + UserTaskInvokable.class); + UserTaskInvokable userFunction = null; + + try { + userFunction = userFunctionClass.newInstance(); + userFunction.declareOutputs(outputs, instanceID, recordBuffer); + } catch (Exception e) { + + } + return userFunction; + } + public static void setPartitioner(Configuration taskConfiguration, int nrOutput, List> partitioners) { Class> partitioner = taskConfiguration diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java index bfc0b58ebffa4f0f60a6e9e2da2e80d23b75de9f..add776d3b1004e55c46f7c200088f7352a04cdc2 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java @@ -25,7 +25,6 @@ import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.RecordReader; import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.template.AbstractTask; -import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.types.Record; @@ -56,18 +55,6 @@ public class StreamTask extends AbstractTask { recordBuffer = new TreeMap(); } - public void setUserFunction(Configuration taskConfiguration) { - Class userFunctionClass = taskConfiguration - .getClass("userfunction", DefaultTaskInvokable.class, - UserTaskInvokable.class); - try { - userFunction = userFunctionClass.newInstance(); - userFunction.declareOutputs(outputs, taskInstanceID, recordBuffer); - } catch (Exception e) { - - } - } - @Override public void registerInputOutput() { Configuration taskConfiguration = getTaskConfiguration(); @@ -77,7 +64,8 @@ public class StreamTask extends AbstractTask { numberOfOutputs = StreamComponentFactory.setConfigOutputs(this, taskConfiguration, outputs, partitioners); - setUserFunction(taskConfiguration); + userFunction = StreamComponentFactory.setUserFunction(taskConfiguration, outputs, + taskInstanceID, recordBuffer); StreamComponentFactory.setAckListener(recordBuffer, taskInstanceID, outputs); }