提交 98063ff6 编写于 作者: Y Yingjun Wu 提交者: Stephan Ewen

[streaming] solve issue #3

上级 4116c4bc
package eu.stratosphere.streaming.api;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.types.Key;
......@@ -14,7 +17,7 @@ import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public final class StreamComponentFactory {
//TODO: put setConfigInputs here
// TODO: put setConfigInputs here
public static void setAckListener(Map<String, StreamRecord> recordBuffer,
String sourceInstanceID, List<RecordWriter<Record>> outputs) {
......@@ -26,27 +29,85 @@ public final class StreamComponentFactory {
}
}
public static int setConfigInputs(StreamTask taskBase,
Configuration taskConfiguration, List<RecordReader<Record>> inputs) {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
for (int i = 0; i < numberOfInputs; i++) {
inputs.add(new RecordReader<Record>(taskBase, Record.class));
}
return numberOfInputs;
}
// this function can be removed as duplication of the above function if
// modification on kernel is allowed.
public static int setConfigInputs(StreamSink taskBase,
Configuration taskConfiguration, List<RecordReader<Record>> inputs) {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
for (int i = 0; i < numberOfInputs; i++) {
inputs.add(new RecordReader<Record>(taskBase, Record.class));
}
return numberOfInputs;
}
public static int setConfigOutputs(StreamTask taskBase,
Configuration taskConfiguration,
List<RecordWriter<Record>> outputs,
List<ChannelSelector<Record>> partitioners) {
int numberOfOutputs = taskConfiguration
.getInteger("numberOfOutputs", 0);
for (int i = 1; i <= numberOfOutputs; i++) {
StreamComponentFactory.setPartitioner(taskConfiguration, i,
partitioners);
}
for (ChannelSelector<Record> outputPartitioner : partitioners) {
outputs.add(new RecordWriter<Record>(taskBase, Record.class,
outputPartitioner));
}
return numberOfOutputs;
}
// this function can be removed as duplication of the above function if
// modification on kernel is allowed.
public static int setConfigOutputs(StreamSource taskBase,
Configuration taskConfiguration,
List<RecordWriter<Record>> outputs,
List<ChannelSelector<Record>> partitioners) {
int numberOfOutputs = taskConfiguration
.getInteger("numberOfOutputs", 0);
for (int i = 1; i <= numberOfOutputs; i++) {
StreamComponentFactory.setPartitioner(taskConfiguration, i,
partitioners);
}
for (ChannelSelector<Record> outputPartitioner : partitioners) {
outputs.add(new RecordWriter<Record>(taskBase, Record.class,
outputPartitioner));
}
return numberOfOutputs;
}
public static void setPartitioner(Configuration taskConfiguration,
int nrOutput, List<ChannelSelector<Record>> partitioners) {
Class<? extends ChannelSelector<Record>> partitioner = taskConfiguration
.getClass("partitionerClass_" + nrOutput, DefaultPartitioner.class,
ChannelSelector.class);
.getClass("partitionerClass_" + nrOutput,
DefaultPartitioner.class, ChannelSelector.class);
try {
if (partitioner.equals(FieldsPartitioner.class)) {
int keyPosition = taskConfiguration.getInteger("partitionerIntParam_"
+ nrOutput, 1);
int keyPosition = taskConfiguration.getInteger(
"partitionerIntParam_" + nrOutput, 1);
Class<? extends Key> keyClass = taskConfiguration.getClass(
"partitionerClassParam_" + nrOutput, StringValue.class, Key.class);
"partitionerClassParam_" + nrOutput, StringValue.class,
Key.class);
partitioners.add(partitioner.getConstructor(int.class, Class.class)
.newInstance(keyPosition, keyClass));
partitioners.add(partitioner.getConstructor(int.class,
Class.class).newInstance(keyPosition, keyClass));
} else {
partitioners.add(partitioner.newInstance());
}
} catch (Exception e) {
System.out.println("partitioner error" + " " + "partitioner_" + nrOutput);
System.out.println("partitioner error" + " " + "partitioner_"
+ nrOutput);
System.out.println(e);
}
}
......
......@@ -38,23 +38,19 @@ public class StreamSink extends AbstractOutputTask {
numberOfInputs = 0;
}
private void setConfigInputs() {
Configuration taskConfiguration = getTaskConfiguration();
numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
for (int i = 0; i < numberOfInputs; i++) {
inputs.add(new RecordReader<Record>(this, Record.class));
}
setUserFunction(taskConfiguration);
}
// alternative: remove the comments on this function as well as the
// statement in cregisterInputOuput functions.
// private void setConfigInputs(Configuration taskConfiguration) {
// numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
// for (int i = 0; i < numberOfInputs; i++) {
// inputs.add(new RecordReader<Record>(this, Record.class));
// }
// }
public void setUserFunction(Configuration taskConfiguration) {
Class<? extends UserSinkInvokable> userFunctionClass;
userFunctionClass = taskConfiguration.getClass("userfunction",
DefaultSinkInvokable.class, UserSinkInvokable.class);
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultSinkInvokable.class,
UserSinkInvokable.class);
try {
userFunction = userFunctionClass.newInstance();
} catch (Exception e) {
......@@ -64,7 +60,11 @@ public class StreamSink extends AbstractOutputTask {
@Override
public void registerInputOutput() {
setConfigInputs();
Configuration taskConfiguration = getTaskConfiguration();
// setConfigInputs(taskConfiguration);
numberOfInputs = StreamComponentFactory.setConfigInputs(this,
taskConfiguration, inputs);
setUserFunction(taskConfiguration);
}
@Override
......
......@@ -35,7 +35,7 @@ public class StreamSource extends AbstractInputTask<RandIS> {
private List<ChannelSelector<Record>> partitioners;
private UserSourceInvokable userFunction;
private int numberOfOutputs;
private static int numSources = 0;
private String sourceInstanceID;
private Map<String, StreamRecord> recordBuffer;
......@@ -49,6 +49,7 @@ public class StreamSource extends AbstractInputTask<RandIS> {
numSources++;
sourceInstanceID = Integer.toString(numSources);
recordBuffer = new TreeMap<String, StreamRecord>();
}
@Override
......@@ -61,39 +62,43 @@ public class StreamSource extends AbstractInputTask<RandIS> {
return null;
}
private void setConfigInputs() {
Configuration taskConfiguration = getTaskConfiguration();
numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
for (int i = 1; i <= numberOfOutputs; i++) {
StreamComponentFactory.setPartitioner(taskConfiguration, i, partitioners);
}
for (ChannelSelector<Record> outputPartitioner : partitioners) {
outputs.add(new RecordWriter<Record>(this, Record.class,
outputPartitioner));
}
setUserFunction(taskConfiguration);
StreamComponentFactory.setAckListener(recordBuffer, sourceInstanceID, outputs);
}
// alternative: remove the comments on this function as well as the
// statement in cregisterInputOuput functions.
// private void setConfigOutputs(Configuration taskConfiguration) {
// numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
// for (int i = 1; i <= numberOfOutputs; i++) {
// StreamComponentFactory.setPartitioner(taskConfiguration, i,
// partitioners);
// }
// for (ChannelSelector<Record> outputPartitioner : partitioners) {
// outputs.add(new RecordWriter<Record>(this, Record.class,
// outputPartitioner));
// }
// }
public void setUserFunction(Configuration taskConfiguration) {
Class<? extends UserSourceInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultSourceInvokable.class,
UserSourceInvokable.class);
try {
this.userFunction = userFunctionClass.newInstance();
this.userFunction.declareOutputs(outputs, sourceInstanceID, recordBuffer);
userFunction = userFunctionClass.newInstance();
userFunction
.declareOutputs(outputs, sourceInstanceID, recordBuffer);
} catch (Exception e) {
}
}
@Override
public void registerInputOutput() {
setConfigInputs();
Configuration taskConfiguration = getTaskConfiguration();
// setConfigOutputs(taskConfiguration);
numberOfOutputs = StreamComponentFactory.setConfigOutputs(this,
taskConfiguration, outputs, partitioners);
setUserFunction(taskConfiguration);
StreamComponentFactory.setAckListener(recordBuffer, sourceInstanceID,
outputs);
}
@Override
......
......@@ -56,28 +56,27 @@ public class StreamTask extends AbstractTask {
recordBuffer = new TreeMap<String, StreamRecord>();
}
private void setConfigInputs() {
Configuration taskConfiguration = getTaskConfiguration();
numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
for (int i = 0; i < numberOfInputs; i++) {
inputs.add(new RecordReader<Record>(this, Record.class));
}
numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
for (int i = 1; i <= numberOfOutputs; i++) {
StreamComponentFactory.setPartitioner(taskConfiguration, i, partitioners);
}
for (ChannelSelector<Record> outputPartitioner : partitioners) {
outputs.add(new RecordWriter<Record>(this, Record.class,
outputPartitioner));
}
setUserFunction(taskConfiguration);
StreamComponentFactory.setAckListener(recordBuffer, taskInstanceID, outputs);
}
// alternative: remove the comments on these two functions as well as the
// two statements in registerInputOuput functions.
// private void setConfigInputs(Configuration taskConfiguration) {
// numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
// for (int i = 0; i < numberOfInputs; i++) {
// inputs.add(new RecordReader<Record>(this, Record.class));
// }
// }
//
// private void setConfigOutputs(Configuration taskConfiguration) {
// numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
// for (int i = 1; i <= numberOfOutputs; i++) {
// StreamComponentFactory.setPartitioner(taskConfiguration, i,
// partitioners);
// }
// for (ChannelSelector<Record> outputPartitioner : partitioners) {
// outputs.add(new RecordWriter<Record>(this, Record.class,
// outputPartitioner));
// }
// }
public void setUserFunction(Configuration taskConfiguration) {
Class<? extends UserTaskInvokable> userFunctionClass = taskConfiguration
......@@ -93,7 +92,18 @@ public class StreamTask extends AbstractTask {
@Override
public void registerInputOutput() {
setConfigInputs();
Configuration taskConfiguration = getTaskConfiguration();
// setConfigInputs(taskConfiguration);
// setConfigOutputs(taskConfiguration);
numberOfInputs = StreamComponentFactory.setConfigInputs(this,
taskConfiguration, inputs);
numberOfOutputs = StreamComponentFactory.setConfigOutputs(this,
taskConfiguration, outputs, partitioners);
setUserFunction(taskConfiguration);
StreamComponentFactory.setAckListener(recordBuffer, taskInstanceID,
outputs);
}
@Override
......@@ -106,9 +116,11 @@ public class StreamTask extends AbstractTask {
hasInput = true;
StreamRecord streamRecord = new StreamRecord(input.next());
String id = streamRecord.popId();
// TODO: Enclose invoke in try-catch to properly fail records
// TODO: Enclose invoke in try-catch to properly fail
// records
userFunction.invoke(streamRecord.getRecord());
System.out.println(this.getClass().getName() + "-" + taskInstanceID);
System.out.println(this.getClass().getName() + "-"
+ taskInstanceID);
System.out.println(recordBuffer.toString());
System.out.println("---------------------");
input.publishEvent(new AckEvent(id));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册