提交 26443e68 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] UserFunction setter move to StreamComponent

上级 28ba7390
...@@ -8,6 +8,8 @@ import eu.stratosphere.nephele.event.task.EventListener; ...@@ -8,6 +8,8 @@ import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordReader; import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner; import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner; import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.types.Key; import eu.stratosphere.types.Key;
...@@ -48,7 +50,7 @@ public final class StreamComponentFactory { ...@@ -48,7 +50,7 @@ public final class StreamComponentFactory {
return numberOfInputs; return numberOfInputs;
} }
//for StreamTask // for StreamTask
public static int setConfigOutputs(StreamTask taskBase, public static int setConfigOutputs(StreamTask taskBase,
Configuration taskConfiguration, Configuration taskConfiguration,
List<RecordWriter<Record>> outputs, List<RecordWriter<Record>> outputs,
...@@ -65,7 +67,7 @@ public final class StreamComponentFactory { ...@@ -65,7 +67,7 @@ public final class StreamComponentFactory {
} }
return numberOfOutputs; return numberOfOutputs;
} }
// this function can be removed as duplication of the above function if // this function can be removed as duplication of the above function if
// modification on kernel is allowed. // modification on kernel is allowed.
// for StreamSource // for StreamSource
...@@ -86,6 +88,25 @@ public final class StreamComponentFactory { ...@@ -86,6 +88,25 @@ public final class StreamComponentFactory {
return numberOfOutputs; return numberOfOutputs;
} }
public static UserTaskInvokable setUserFunction(
Configuration taskConfiguration,
List<RecordWriter<Record>> outputs, String instanceID,
Map<String, StreamRecord> recordBuffer) {
Class<? extends UserTaskInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultTaskInvokable.class,
UserTaskInvokable.class);
UserTaskInvokable userFunction = null;
try {
userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs, instanceID, recordBuffer);
} catch (Exception e) {
}
return userFunction;
}
public static void setPartitioner(Configuration taskConfiguration, public static void setPartitioner(Configuration taskConfiguration,
int nrOutput, List<ChannelSelector<Record>> partitioners) { int nrOutput, List<ChannelSelector<Record>> partitioners) {
Class<? extends ChannelSelector<Record>> partitioner = taskConfiguration Class<? extends ChannelSelector<Record>> partitioner = taskConfiguration
......
...@@ -25,7 +25,6 @@ import eu.stratosphere.nephele.io.ChannelSelector; ...@@ -25,7 +25,6 @@ import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordReader; import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask; import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
...@@ -56,18 +55,6 @@ public class StreamTask extends AbstractTask { ...@@ -56,18 +55,6 @@ public class StreamTask extends AbstractTask {
recordBuffer = new TreeMap<String, StreamRecord>(); recordBuffer = new TreeMap<String, StreamRecord>();
} }
public void setUserFunction(Configuration taskConfiguration) {
Class<? extends UserTaskInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultTaskInvokable.class,
UserTaskInvokable.class);
try {
userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs, taskInstanceID, recordBuffer);
} catch (Exception e) {
}
}
@Override @Override
public void registerInputOutput() { public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration(); Configuration taskConfiguration = getTaskConfiguration();
...@@ -77,7 +64,8 @@ public class StreamTask extends AbstractTask { ...@@ -77,7 +64,8 @@ public class StreamTask extends AbstractTask {
numberOfOutputs = StreamComponentFactory.setConfigOutputs(this, numberOfOutputs = StreamComponentFactory.setConfigOutputs(this,
taskConfiguration, outputs, partitioners); taskConfiguration, outputs, partitioners);
setUserFunction(taskConfiguration); userFunction = StreamComponentFactory.setUserFunction(taskConfiguration, outputs,
taskInstanceID, recordBuffer);
StreamComponentFactory.setAckListener(recordBuffer, taskInstanceID, StreamComponentFactory.setAckListener(recordBuffer, taskInstanceID,
outputs); outputs);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册