提交 e34bcd2c 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] Created StreamComponentFactory for the common methods of StreamSource and StreamTask

上级 5d40e871
package eu.stratosphere.streaming.api;
import java.util.List;
import java.util.Map;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.types.Key;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public final class StreamComponentFactory {
//TODO: put setConfigInputs here
public static void setAckListener(Map<String, StreamRecord> recordBuffer,
String sourceInstanceID, List<RecordWriter<Record>> outputs) {
EventListener eventListener = new AckEventListener(sourceInstanceID,
recordBuffer);
for (RecordWriter<Record> output : outputs) {
// TODO: separate outputs
output.subscribeToEvent(eventListener, AckEvent.class);
}
}
public static void setPartitioner(Configuration taskConfiguration,
int nrOutput, List<ChannelSelector<Record>> partitioners) {
Class<? extends ChannelSelector<Record>> partitioner = taskConfiguration
.getClass("partitionerClass_" + nrOutput, DefaultPartitioner.class,
ChannelSelector.class);
try {
if (partitioner.equals(FieldsPartitioner.class)) {
int keyPosition = taskConfiguration.getInteger("partitionerIntParam_"
+ nrOutput, 1);
Class<? extends Key> keyClass = taskConfiguration.getClass(
"partitionerClassParam_" + nrOutput, StringValue.class, Key.class);
partitioners.add(partitioner.getConstructor(int.class, Class.class)
.newInstance(keyPosition, keyClass));
} else {
partitioners.add(partitioner.newInstance());
}
} catch (Exception e) {
System.out.println("partitioner error" + " " + "partitioner_" + nrOutput);
System.out.println(e);
}
}
}
\ No newline at end of file
......@@ -21,18 +21,13 @@ import java.util.Map;
import java.util.TreeMap;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.streaming.test.RandIS;
import eu.stratosphere.types.Key;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class StreamSource extends AbstractInputTask<RandIS> {
......@@ -72,7 +67,7 @@ public class StreamSource extends AbstractInputTask<RandIS> {
numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
for (int i = 1; i <= numberOfOutputs; i++) {
setPartitioner(taskConfiguration, i);
StreamComponentFactory.setPartitioner(taskConfiguration, i, partitioners);
}
for (ChannelSelector<Record> outputPartitioner : partitioners) {
......@@ -81,63 +76,24 @@ public class StreamSource extends AbstractInputTask<RandIS> {
}
setUserFunction(taskConfiguration);
setAckListener();
StreamComponentFactory.setAckListener(recordBuffer, sourceInstanceID, outputs);
}
public void setUserFunction(Configuration taskConfiguration) {
Class<? extends UserSourceInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultSourceInvokable.class,
UserSourceInvokable.class);
try {
this.userFunction = userFunctionClass.newInstance();
this.userFunction.declareOutputs(outputs, sourceInstanceID, recordBuffer);
} catch (Exception e) {
}
}
private void setPartitioner(Configuration taskConfiguration, int nrOutput) {
Class<? extends ChannelSelector<Record>> partitioner = taskConfiguration
.getClass("partitionerClass_" + nrOutput, DefaultPartitioner.class,
ChannelSelector.class);
try {
if (partitioner.equals(FieldsPartitioner.class)) {
int keyPosition = taskConfiguration.getInteger("partitionerIntParam_"
+ nrOutput, 1);
Class<? extends Key> keyClass = taskConfiguration.getClass(
"partitionerClassParam_" + nrOutput, StringValue.class, Key.class);
partitioners.add(partitioner.getConstructor(int.class, Class.class)
.newInstance(keyPosition, keyClass));
} else {
partitioners.add(partitioner.newInstance());
}
} catch (Exception e) {
System.out.println("partitioner error" + " " + "partitioner_" + nrOutput);
System.out.println(e);
}
}
public void setAckListener() {
EventListener eventListener = new AckEventListener(sourceInstanceID,
recordBuffer);
for (RecordWriter<Record> output : outputs) {
// TODO: separate outputs
output.subscribeToEvent(eventListener, AckEvent.class);
}
}
@Override
public void registerInputOutput() {
setConfigInputs();
}
@Override
......
......@@ -21,18 +21,13 @@ import java.util.Map;
import java.util.TreeMap;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.types.Key;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
//TODO: Refactor, create common ancestor with StreamSource
public class StreamTask extends AbstractTask {
......@@ -43,7 +38,7 @@ public class StreamTask extends AbstractTask {
private UserTaskInvokable userFunction;
private int numberOfInputs;
private int numberOfOutputs;
private static int numTasks = 0;
private String taskInstanceID = "";
private Map<String, StreamRecord> recordBuffer;
......@@ -62,7 +57,6 @@ public class StreamTask extends AbstractTask {
}
private void setConfigInputs() {
Configuration taskConfiguration = getTaskConfiguration();
numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
......@@ -73,7 +67,7 @@ public class StreamTask extends AbstractTask {
numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
for (int i = 1; i <= numberOfOutputs; i++) {
setPartitioner(taskConfiguration, i);
StreamComponentFactory.setPartitioner(taskConfiguration, i, partitioners);
}
for (ChannelSelector<Record> outputPartitioner : partitioners) {
......@@ -82,20 +76,10 @@ public class StreamTask extends AbstractTask {
}
setUserFunction(taskConfiguration);
setAckListener();
}
public void setAckListener() {
EventListener eventListener = new AckEventListener(taskInstanceID,
recordBuffer);
for (RecordWriter<Record> output : outputs) {
// TODO: separate outputs
output.subscribeToEvent(eventListener, AckEvent.class);
}
StreamComponentFactory.setAckListener(recordBuffer, taskInstanceID, outputs);
}
public void setUserFunction(Configuration taskConfiguration) {
Class<? extends UserTaskInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultTaskInvokable.class,
UserTaskInvokable.class);
......@@ -105,32 +89,6 @@ public class StreamTask extends AbstractTask {
} catch (Exception e) {
}
}
private void setPartitioner(Configuration taskConfiguration, int nrOutput) {
Class<? extends ChannelSelector<Record>> partitioner = taskConfiguration
.getClass("partitionerClass_" + nrOutput, DefaultPartitioner.class,
ChannelSelector.class);
try {
if (partitioner.equals(FieldsPartitioner.class)) {
int keyPosition = taskConfiguration.getInteger("partitionerIntParam_"
+ nrOutput, 1);
Class<? extends Key> keyClass = taskConfiguration.getClass(
"partitionerClassParam_" + nrOutput, StringValue.class, Key.class);
partitioners.add(partitioner.getConstructor(int.class, Class.class)
.newInstance(keyPosition, keyClass));
} else {
partitioners.add(partitioner.newInstance());
}
} catch (Exception e) {
System.out.println("partitioner error" + " " + "partitioner_" + nrOutput);
System.out.println(e);
}
}
@Override
......@@ -148,7 +106,7 @@ public class StreamTask extends AbstractTask {
hasInput = true;
StreamRecord streamRecord = new StreamRecord(input.next());
String id = streamRecord.popId();
//TODO: Enclose invoke in try-catch to properly fail records
// TODO: Enclose invoke in try-catch to properly fail records
userFunction.invoke(streamRecord.getRecord());
System.out.println(this.getClass().getName() + "-" + taskInstanceID);
System.out.println(recordBuffer.toString());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册