提交 fecbe358 编写于 作者: G Gyula Fora 提交者: Stephan Ewen

[streaming] updated streamrecord getrecord method

上级 04a717e0
package eu.stratosphere.streaming.api;
import java.util.Map;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
......@@ -11,7 +9,6 @@ public class AckEventListener implements EventListener {
private FaultTolerancyBuffer recordBuffer;
public AckEventListener(String taskInstanceID,FaultTolerancyBuffer recordBuffer) {
// public AckEventListener(String taskInstanceID,Map<String, StreamRecord> recordBuffer) {
this.taskInstanceID=taskInstanceID;
this.recordBuffer=recordBuffer;
}
......
......@@ -12,14 +12,16 @@ public class FaultTolerancyBuffer {
private Map<String, StreamRecord> recordBuffer;
private Map<String, Integer> ackCounter;
private List<RecordWriter<Record>> outputs;
private String channelID;
private int numberOfOutputs;
public FaultTolerancyBuffer(List<RecordWriter<Record>> outputs) {
public FaultTolerancyBuffer(List<RecordWriter<Record>> outputs, String channelID) {
this.outputs=outputs;
this.recordBuffer = new HashMap<String, StreamRecord>();
this.ackCounter = new HashMap<String, Integer>();
this.numberOfOutputs = outputs.size();
this.channelID=channelID;
}
......@@ -52,7 +54,7 @@ public class FaultTolerancyBuffer {
public void failRecord(String recordID) {
// Create new id to avoid double counting acks
StreamRecord newRecord = new StreamRecord(popRecord(recordID)).addId();
StreamRecord newRecord = new StreamRecord(popRecord(recordID),channelID).addId();
addRecord(newRecord);
reEmit(newRecord.getRecord());
......
......@@ -31,6 +31,9 @@ import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamSink;
import eu.stratosphere.streaming.api.streamcomponent.StreamSource;
import eu.stratosphere.streaming.api.streamcomponent.StreamTask;
import eu.stratosphere.streaming.partitioner.BroadcastPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.streaming.partitioner.GlobalPartitioner;
......
......@@ -13,7 +13,7 @@ public final class StreamRecord {
private String channelID = "";
public StreamRecord(Record record) {
this.record = record;
this.record = record.createCopy();
}
public StreamRecord(Record record, String channelID) {
......@@ -40,7 +40,13 @@ public final class StreamRecord {
}
public Record getRecord() {
return record;
Record newRecord=this.record.createCopy();
newRecord.removeField(newRecord.getNumFields() - 1);
return newRecord;
}
public Record getRecordWithId() {
return this.record;
}
// TODO:write proper toString
......
package eu.stratosphere.streaming.api.invokable;
import java.util.List;
import java.util.Map;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.StreamRecord;
......@@ -30,7 +28,7 @@ public abstract class StreamInvokable {
for (RecordWriter<Record> output : outputs) {
try {
output.emit(streamRecord.getRecord());
output.emit(streamRecord.getRecordWithId());
System.out.println(this.getClass().getName());
System.out.println("Emitted " + streamRecord.getId() + "-"
......
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.streamcomponent;
import java.util.List;
import java.util.Map;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.AckEvent;
import eu.stratosphere.streaming.api.AckEventListener;
import eu.stratosphere.streaming.api.FailEvent;
import eu.stratosphere.streaming.api.FailEventListener;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.StreamInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.types.Key;
......@@ -25,6 +33,16 @@ public final class StreamComponentFactory {
output.subscribeToEvent(eventListener, AckEvent.class);
}
}
public static void setFailListener(FaultTolerancyBuffer recordBuffer,
String sourceInstanceID, List<RecordWriter<Record>> outputs) {
EventListener eventListener = new FailEventListener(sourceInstanceID,
recordBuffer);
for (RecordWriter<Record> output : outputs) {
// TODO: separate outputs
output.subscribeToEvent(eventListener, FailEvent.class);
}
}
// for StreamTask
public static int setConfigInputs(StreamTask taskBase,
......@@ -50,11 +68,14 @@ public final class StreamComponentFactory {
// for StreamTask
public static int setConfigOutputs(StreamTask taskBase,
Configuration taskConfiguration, List<RecordWriter<Record>> outputs,
Configuration taskConfiguration,
List<RecordWriter<Record>> outputs,
List<ChannelSelector<Record>> partitioners) {
int numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
int numberOfOutputs = taskConfiguration
.getInteger("numberOfOutputs", 0);
for (int i = 1; i <= numberOfOutputs; i++) {
StreamComponentFactory.setPartitioner(taskConfiguration, i, partitioners);
StreamComponentFactory.setPartitioner(taskConfiguration, i,
partitioners);
}
for (ChannelSelector<Record> outputPartitioner : partitioners) {
outputs.add(new RecordWriter<Record>(taskBase, Record.class,
......@@ -67,11 +88,14 @@ public final class StreamComponentFactory {
// modification on kernel is allowed.
// for StreamSource
public static int setConfigOutputs(StreamSource taskBase,
Configuration taskConfiguration, List<RecordWriter<Record>> outputs,
Configuration taskConfiguration,
List<RecordWriter<Record>> outputs,
List<ChannelSelector<Record>> partitioners) {
int numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
int numberOfOutputs = taskConfiguration
.getInteger("numberOfOutputs", 0);
for (int i = 1; i <= numberOfOutputs; i++) {
StreamComponentFactory.setPartitioner(taskConfiguration, i, partitioners);
StreamComponentFactory.setPartitioner(taskConfiguration, i,
partitioners);
}
for (ChannelSelector<Record> outputPartitioner : partitioners) {
outputs.add(new RecordWriter<Record>(taskBase, Record.class,
......@@ -80,27 +104,64 @@ public final class StreamComponentFactory {
return numberOfOutputs;
}
public static UserSinkInvokable setUserFunction(Configuration taskConfiguration) {
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultSinkInvokable.class,
UserSinkInvokable.class);
UserSinkInvokable userFunction = null;
try {
userFunction = userFunctionClass.newInstance();
} catch (Exception e) {
}
return userFunction;
}
public static StreamInvokable setUserFunction(
Configuration taskConfiguration,
List<RecordWriter<Record>> outputs, String instanceID,
FaultTolerancyBuffer recordBuffer) {
//Default value is a TaskInvokable even if it was called from a source
Class<? extends StreamInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultTaskInvokable.class,
StreamInvokable.class);
StreamInvokable userFunction = null;
try {
userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs, instanceID, recordBuffer);
} catch (Exception e) {
}
return userFunction;
}
public static void setPartitioner(Configuration taskConfiguration,
int nrOutput, List<ChannelSelector<Record>> partitioners) {
Class<? extends ChannelSelector<Record>> partitioner = taskConfiguration
.getClass("partitionerClass_" + nrOutput, DefaultPartitioner.class,
ChannelSelector.class);
.getClass("partitionerClass_" + nrOutput,
DefaultPartitioner.class, ChannelSelector.class);
try {
if (partitioner.equals(FieldsPartitioner.class)) {
int keyPosition = taskConfiguration.getInteger("partitionerIntParam_"
+ nrOutput, 1);
int keyPosition = taskConfiguration.getInteger(
"partitionerIntParam_" + nrOutput, 1);
Class<? extends Key> keyClass = taskConfiguration.getClass(
"partitionerClassParam_" + nrOutput, StringValue.class, Key.class);
"partitionerClassParam_" + nrOutput, StringValue.class,
Key.class);
partitioners.add(partitioner.getConstructor(int.class, Class.class)
.newInstance(keyPosition, keyClass));
partitioners.add(partitioner.getConstructor(int.class,
Class.class).newInstance(keyPosition, keyClass));
} else {
partitioners.add(partitioner.newInstance());
}
} catch (Exception e) {
System.out.println("partitioner error" + " " + "partitioner_" + nrOutput);
System.out.println("partitioner error" + " " + "partitioner_"
+ nrOutput);
System.out.println(e);
}
}
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.streamcomponent;
import java.util.LinkedList;
import java.util.List;
......@@ -21,7 +21,8 @@ import java.util.List;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.AckEvent;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.Record;
......@@ -29,33 +30,18 @@ public class StreamSink extends AbstractOutputTask {
private List<RecordReader<Record>> inputs;
private UserSinkInvokable userFunction;
private int numberOfInputs;
public StreamSink() {
// TODO: Make configuration file visible and call setClassInputs() here
inputs = new LinkedList<RecordReader<Record>>();
userFunction = null;
numberOfInputs = 0;
}
public void setUserFunction(Configuration taskConfiguration) {
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultSinkInvokable.class,
UserSinkInvokable.class);
try {
userFunction = userFunctionClass.newInstance();
} catch (Exception e) {
}
}
@Override
public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration();
// setConfigInputs(taskConfiguration);
numberOfInputs = StreamComponentFactory.setConfigInputs(this,
taskConfiguration, inputs);
setUserFunction(taskConfiguration);
StreamComponentFactory.setConfigInputs(this, taskConfiguration, inputs);
userFunction = StreamComponentFactory.setUserFunction(taskConfiguration);
}
@Override
......@@ -67,7 +53,7 @@ public class StreamSink extends AbstractOutputTask {
if (input.hasNext()) {
hasInput = true;
StreamRecord rec = new StreamRecord(input.next());
String id = rec.popId();
String id = rec.getId();
userFunction.invoke(rec.getRecord());
input.publishEvent(new AckEvent(id));
}
......
......@@ -13,18 +13,16 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.streamcomponent;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.test.RandIS;
import eu.stratosphere.types.Record;
......@@ -34,8 +32,6 @@ public class StreamSource extends AbstractInputTask<RandIS> {
private List<RecordWriter<Record>> outputs;
private List<ChannelSelector<Record>> partitioners;
private UserSourceInvokable userFunction;
private int numberOfOutputs;
private static int numSources = 0;
private String sourceInstanceID;
private FaultTolerancyBuffer recordBuffer;
......@@ -45,7 +41,6 @@ public class StreamSource extends AbstractInputTask<RandIS> {
outputs = new LinkedList<RecordWriter<Record>>();
partitioners = new LinkedList<ChannelSelector<Record>>();
userFunction = null;
numberOfOutputs = 0;
numSources++;
sourceInstanceID = Integer.toString(numSources);
......@@ -61,28 +56,19 @@ public class StreamSource extends AbstractInputTask<RandIS> {
return null;
}
public void setUserFunction(Configuration taskConfiguration) {
Class<? extends UserSourceInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultSourceInvokable.class,
UserSourceInvokable.class);
try {
userFunction = userFunctionClass.newInstance();
userFunction
.declareOutputs(outputs, sourceInstanceID, recordBuffer);
} catch (Exception e) {
}
}
@Override
public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration();
numberOfOutputs = StreamComponentFactory.setConfigOutputs(this,
taskConfiguration, outputs, partitioners);
recordBuffer=new FaultTolerancyBuffer(outputs);
setUserFunction(taskConfiguration);
StreamComponentFactory.setConfigOutputs(this, taskConfiguration,
outputs, partitioners);
recordBuffer = new FaultTolerancyBuffer(outputs,sourceInstanceID);
userFunction = (UserSourceInvokable) StreamComponentFactory
.setUserFunction(taskConfiguration, outputs, sourceInstanceID,
recordBuffer);
StreamComponentFactory.setAckListener(recordBuffer, sourceInstanceID,
outputs);
StreamComponentFactory.setFailListener(recordBuffer, sourceInstanceID,
outputs);
}
......
......@@ -13,32 +13,29 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
package eu.stratosphere.streaming.api.streamcomponent;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.AckEvent;
import eu.stratosphere.streaming.api.FailEvent;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.Record;
//TODO: Refactor, create common ancestor with StreamSource
public class StreamTask extends AbstractTask {
private List<RecordReader<Record>> inputs;
private List<RecordWriter<Record>> outputs;
private List<ChannelSelector<Record>> partitioners;
private UserTaskInvokable userFunction;
private int numberOfInputs;
private int numberOfOutputs;
private static int numTasks = 0;
private String taskInstanceID = "";
......@@ -50,39 +47,26 @@ public class StreamTask extends AbstractTask {
outputs = new LinkedList<RecordWriter<Record>>();
partitioners = new LinkedList<ChannelSelector<Record>>();
userFunction = null;
numberOfInputs = 0;
numberOfOutputs = 0;
numTasks++;
taskInstanceID = Integer.toString(numTasks);
}
public void setUserFunction(Configuration taskConfiguration) {
Class<? extends UserTaskInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultTaskInvokable.class,
UserTaskInvokable.class);
try {
userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs, taskInstanceID, recordBuffer);
} catch (Exception e) {
}
}
@Override
public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration();
numberOfInputs = StreamComponentFactory.setConfigInputs(this,
taskConfiguration, inputs);
numberOfOutputs = StreamComponentFactory.setConfigOutputs(this,
taskConfiguration, outputs, partitioners);
recordBuffer = new FaultTolerancyBuffer(outputs);
StreamComponentFactory.setConfigInputs(this, taskConfiguration, inputs);
StreamComponentFactory.setConfigOutputs(this, taskConfiguration, outputs,
partitioners);
setUserFunction(taskConfiguration);
recordBuffer = new FaultTolerancyBuffer(outputs, taskInstanceID);
userFunction = (UserTaskInvokable) StreamComponentFactory.setUserFunction(
taskConfiguration, outputs, taskInstanceID, recordBuffer);
StreamComponentFactory
.setAckListener(recordBuffer, taskInstanceID, outputs);
StreamComponentFactory.setFailListener(recordBuffer, taskInstanceID,
outputs);
}
@Override
......@@ -94,14 +78,20 @@ public class StreamTask extends AbstractTask {
if (input.hasNext()) {
hasInput = true;
StreamRecord streamRecord = new StreamRecord(input.next());
String id = streamRecord.popId();
String id = streamRecord.getId();
// TODO: Enclose invoke in try-catch to properly fail
// records
userFunction.invoke(streamRecord.getRecord());
System.out.println(this.getClass().getName() + "-" + taskInstanceID);
System.out.println(recordBuffer.getRecordBuffer());
System.out.println("---------------------");
input.publishEvent(new AckEvent(id));
try {
userFunction.invoke(streamRecord.getRecord());
System.out
.println(this.getClass().getName() + "-" + taskInstanceID);
System.out.println(recordBuffer.getRecordBuffer());
System.out.println("---------------------");
input.publishEvent(new AckEvent(id));
} catch (Exception e) {
input.publishEvent(new FailEvent(id));
}
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册