提交 02e7747c 编写于 作者: G Gyula Fora 提交者: Stephan Ewen

[streaming] emit update

上级 960cecc9
package eu.stratosphere.streaming.api;
import java.util.Map;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
public class AckEventListener implements EventListener {
private String taskInstanceID;
private FaultTolerancyBuffer recordBuffer;
private Map<String, StreamRecord> recordBuffer;
public AckEventListener(String taskInstanceID,FaultTolerancyBuffer recordBuffer) {
public AckEventListener(String taskInstanceID,Map<String, StreamRecord> recordBuffer) {
this.taskInstanceID=taskInstanceID;
this.recordBuffer=recordBuffer;
}
......@@ -19,8 +21,8 @@ public class AckEventListener implements EventListener {
String ackCID = recordId.split("-", 2)[0];
if (ackCID.equals(taskInstanceID)) {
System.out.println("Ack recieved " + ackEvent.getRecordId());
recordBuffer.ackRecord(ackEvent.getRecordId());
System.out.println(recordBuffer.getRecordBuffer());
recordBuffer.remove(ackEvent.getRecordId());
System.out.println(recordBuffer);
System.out.println("---------------------");
}
......
package eu.stratosphere.streaming.api;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
public class FailEvent extends AbstractTaskEvent {
private String recordId;
public FailEvent(String recordId) {
setRecordId(recordId);
System.out.println("Fail sent " + recordId);
System.out.println("---------------------");
}
@Override
public void write(DataOutput out) throws IOException {}
@Override
public void read(DataInput in) throws IOException {}
public void setRecordId(String recordId) {
this.recordId = recordId;
}
public String getRecordId() {
return this.recordId;
}
}
\ No newline at end of file
package eu.stratosphere.streaming.api;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
public class FailEventListener implements EventListener {
private String taskInstanceID;
private FaultTolerancyBuffer recordBuffer;
public FailEventListener(String taskInstanceID,
FaultTolerancyBuffer recordBuffer) {
this.taskInstanceID = taskInstanceID;
this.recordBuffer = recordBuffer;
}
public void eventOccurred(AbstractTaskEvent event) {
FailEvent failEvent = (FailEvent) event;
String recordId = failEvent.getRecordId();
String failCID = recordId.split("-", 2)[0];
if (failCID.equals(taskInstanceID)) {
System.out.println("Fail recieved " + recordId);
recordBuffer.failRecord(recordId);
System.out.println(recordBuffer.getRecordBuffer());
System.out.println("---------------------");
}
}
}
package eu.stratosphere.streaming.api;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.types.Record;
public class FaultTolerancyBuffer {
private Map<String, StreamRecord> recordBuffer;
private Map<String, Integer> ackCounter;
private List<RecordWriter<Record>> outputs;
private String channelID;
private int numberOfOutputs;
public FaultTolerancyBuffer(List<RecordWriter<Record>> outputs, String channelID) {
this.outputs=outputs;
this.recordBuffer = new HashMap<String, StreamRecord>();
this.ackCounter = new HashMap<String, Integer>();
this.numberOfOutputs = outputs.size();
this.channelID=channelID;
}
public void addRecord(StreamRecord streamRecord) {
recordBuffer.put(streamRecord.getId(), streamRecord);
ackCounter.put(streamRecord.getId(), numberOfOutputs);
}
public Record popRecord(String recordID) {
Record record = recordBuffer.get(recordID).getRecord();
recordBuffer.remove(recordID);
ackCounter.remove(recordID);
return record;
}
public void ackRecord(String recordID) {
if (ackCounter.containsKey(recordID)) {
int ackCount = ackCounter.get(recordID) - 1;
ackCounter.put(recordID, ackCount);
if (ackCount == 0) {
recordBuffer.remove(recordID);
ackCounter.remove(recordID);
}
}
}
public void failRecord(String recordID) {
// Create new id to avoid double counting acks
StreamRecord newRecord = new StreamRecord(popRecord(recordID),channelID).addId();
addRecord(newRecord);
reEmit(newRecord.getRecord());
}
public void reEmit(Record record) {
for (RecordWriter<Record> output : outputs) {
try {
output.emit(record);
} catch (Exception e) {
System.out.println("Re-emit failed");
}
}
}
public Map<String, StreamRecord> getRecordBuffer() {
return this.recordBuffer;
}
}
......@@ -31,9 +31,6 @@ import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamcomponent.StreamSink;
import eu.stratosphere.streaming.api.streamcomponent.StreamSource;
import eu.stratosphere.streaming.api.streamcomponent.StreamTask;
import eu.stratosphere.streaming.partitioner.BroadcastPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.streaming.partitioner.GlobalPartitioner;
......
package eu.stratosphere.streaming.api.streamcomponent;
package eu.stratosphere.streaming.api;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.AckEvent;
import eu.stratosphere.streaming.api.AckEventListener;
import eu.stratosphere.streaming.api.FailEvent;
import eu.stratosphere.streaming.api.FailEventListener;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.StreamInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.types.Key;
......@@ -23,8 +17,9 @@ import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public final class StreamComponentFactory {
// TODO: put setConfigInputs here
public static void setAckListener(FaultTolerancyBuffer recordBuffer,
public static void setAckListener(Map<String, StreamRecord> recordBuffer,
String sourceInstanceID, List<RecordWriter<Record>> outputs) {
EventListener eventListener = new AckEventListener(sourceInstanceID,
recordBuffer);
......@@ -33,18 +28,7 @@ public final class StreamComponentFactory {
output.subscribeToEvent(eventListener, AckEvent.class);
}
}
public static void setFailListener(FaultTolerancyBuffer recordBuffer,
String sourceInstanceID, List<RecordWriter<Record>> outputs) {
EventListener eventListener = new FailEventListener(sourceInstanceID,
recordBuffer);
for (RecordWriter<Record> output : outputs) {
// TODO: separate outputs
output.subscribeToEvent(eventListener, FailEvent.class);
}
}
// for StreamTask
public static int setConfigInputs(StreamTask taskBase,
Configuration taskConfiguration, List<RecordReader<Record>> inputs) {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
......@@ -56,7 +40,6 @@ public final class StreamComponentFactory {
// this function can be removed as duplication of the above function if
// modification on kernel is allowed.
// for StreamSink
public static int setConfigInputs(StreamSink taskBase,
Configuration taskConfiguration, List<RecordReader<Record>> inputs) {
int numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
......@@ -66,7 +49,6 @@ public final class StreamComponentFactory {
return numberOfInputs;
}
// for StreamTask
public static int setConfigOutputs(StreamTask taskBase,
Configuration taskConfiguration,
List<RecordWriter<Record>> outputs,
......@@ -83,10 +65,9 @@ public final class StreamComponentFactory {
}
return numberOfOutputs;
}
// this function can be removed as duplication of the above function if
// modification on kernel is allowed.
// for StreamSource
public static int setConfigOutputs(StreamSource taskBase,
Configuration taskConfiguration,
List<RecordWriter<Record>> outputs,
......@@ -104,41 +85,6 @@ public final class StreamComponentFactory {
return numberOfOutputs;
}
public static UserSinkInvokable setUserFunction(Configuration taskConfiguration) {
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultSinkInvokable.class,
UserSinkInvokable.class);
UserSinkInvokable userFunction = null;
try {
userFunction = userFunctionClass.newInstance();
} catch (Exception e) {
}
return userFunction;
}
public static StreamInvokable setUserFunction(
Configuration taskConfiguration,
List<RecordWriter<Record>> outputs, String instanceID,
FaultTolerancyBuffer recordBuffer) {
//Default value is a TaskInvokable even if it was called from a source
Class<? extends StreamInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultTaskInvokable.class,
StreamInvokable.class);
StreamInvokable userFunction = null;
try {
userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs, instanceID, recordBuffer);
} catch (Exception e) {
}
return userFunction;
}
public static void setPartitioner(Configuration taskConfiguration,
int nrOutput, List<ChannelSelector<Record>> partitioners) {
Class<? extends ChannelSelector<Record>> partitioner = taskConfiguration
......
......@@ -13,7 +13,7 @@ public final class StreamRecord {
private String channelID = "";
public StreamRecord(Record record) {
this.record = record.createCopy();
this.record = record;
}
public StreamRecord(Record record, String channelID) {
......@@ -40,13 +40,7 @@ public final class StreamRecord {
}
public Record getRecord() {
Record newRecord=this.record.createCopy();
newRecord.removeField(newRecord.getNumFields() - 1);
return newRecord;
}
public Record getRecordWithId() {
return this.record;
return record;
}
// TODO:write proper toString
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent;
package eu.stratosphere.streaming.api;
import java.util.LinkedList;
import java.util.List;
......@@ -21,8 +21,7 @@ import java.util.List;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.streaming.api.AckEvent;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.Record;
......@@ -30,18 +29,42 @@ public class StreamSink extends AbstractOutputTask {
private List<RecordReader<Record>> inputs;
private UserSinkInvokable userFunction;
private int numberOfInputs;
public StreamSink() {
// TODO: Make configuration file visible and call setClassInputs() here
inputs = new LinkedList<RecordReader<Record>>();
userFunction = null;
numberOfInputs = 0;
}
// alternative: remove the comments on this function as well as the
// statement in cregisterInputOuput functions.
// private void setConfigInputs(Configuration taskConfiguration) {
// numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
// for (int i = 0; i < numberOfInputs; i++) {
// inputs.add(new RecordReader<Record>(this, Record.class));
// }
// }
public void setUserFunction(Configuration taskConfiguration) {
Class<? extends UserSinkInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultSinkInvokable.class,
UserSinkInvokable.class);
try {
userFunction = userFunctionClass.newInstance();
} catch (Exception e) {
}
}
@Override
public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration();
StreamComponentFactory.setConfigInputs(this, taskConfiguration, inputs);
userFunction = StreamComponentFactory.setUserFunction(taskConfiguration);
// setConfigInputs(taskConfiguration);
numberOfInputs = StreamComponentFactory.setConfigInputs(this,
taskConfiguration, inputs);
setUserFunction(taskConfiguration);
}
@Override
......@@ -53,7 +76,7 @@ public class StreamSink extends AbstractOutputTask {
if (input.hasNext()) {
hasInput = true;
StreamRecord rec = new StreamRecord(input.next());
String id = rec.getId();
String id = rec.popId();
userFunction.invoke(rec.getRecord());
input.publishEvent(new AckEvent(id));
}
......
......@@ -13,16 +13,18 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent;
package eu.stratosphere.streaming.api;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.test.RandIS;
import eu.stratosphere.types.Record;
......@@ -32,17 +34,21 @@ public class StreamSource extends AbstractInputTask<RandIS> {
private List<RecordWriter<Record>> outputs;
private List<ChannelSelector<Record>> partitioners;
private UserSourceInvokable userFunction;
private int numberOfOutputs;
private static int numSources = 0;
private String sourceInstanceID;
private FaultTolerancyBuffer recordBuffer;
private Map<String, StreamRecord> recordBuffer;
public StreamSource() {
// TODO: Make configuration file visible and call setClassInputs() here
outputs = new LinkedList<RecordWriter<Record>>();
partitioners = new LinkedList<ChannelSelector<Record>>();
userFunction = null;
numberOfOutputs = 0;
numSources++;
sourceInstanceID = Integer.toString(numSources);
recordBuffer = new TreeMap<String, StreamRecord>();
}
......@@ -56,19 +62,42 @@ public class StreamSource extends AbstractInputTask<RandIS> {
return null;
}
// alternative: remove the comments on this function as well as the
// statement in cregisterInputOuput functions.
// private void setConfigOutputs(Configuration taskConfiguration) {
// numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
// for (int i = 1; i <= numberOfOutputs; i++) {
// StreamComponentFactory.setPartitioner(taskConfiguration, i,
// partitioners);
// }
// for (ChannelSelector<Record> outputPartitioner : partitioners) {
// outputs.add(new RecordWriter<Record>(this, Record.class,
// outputPartitioner));
// }
// }
public void setUserFunction(Configuration taskConfiguration) {
Class<? extends UserSourceInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultSourceInvokable.class,
UserSourceInvokable.class);
try {
userFunction = userFunctionClass.newInstance();
userFunction
.declareOutputs(outputs, sourceInstanceID, recordBuffer);
} catch (Exception e) {
}
}
@Override
public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration();
StreamComponentFactory.setConfigOutputs(this, taskConfiguration,
outputs, partitioners);
recordBuffer = new FaultTolerancyBuffer(outputs,sourceInstanceID);
userFunction = (UserSourceInvokable) StreamComponentFactory
.setUserFunction(taskConfiguration, outputs, sourceInstanceID,
recordBuffer);
// setConfigOutputs(taskConfiguration);
numberOfOutputs = StreamComponentFactory.setConfigOutputs(this,
taskConfiguration, outputs, partitioners);
setUserFunction(taskConfiguration);
StreamComponentFactory.setAckListener(recordBuffer, sourceInstanceID,
outputs);
StreamComponentFactory.setFailListener(recordBuffer, sourceInstanceID,
outputs);
}
......@@ -77,7 +106,7 @@ public class StreamSource extends AbstractInputTask<RandIS> {
userFunction.invoke();
System.out.println(this.getClass().getName() + "-" + sourceInstanceID);
System.out.println(recordBuffer.getRecordBuffer());
System.out.println(recordBuffer.toString());
System.out.println("---------------------");
}
......
......@@ -13,33 +13,35 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.streamcomponent;
package eu.stratosphere.streaming.api;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.api.AckEvent;
import eu.stratosphere.streaming.api.FailEvent;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.Record;
//TODO: Refactor, create common ancestor with StreamSource
public class StreamTask extends AbstractTask {
private List<RecordReader<Record>> inputs;
private List<RecordWriter<Record>> outputs;
private List<ChannelSelector<Record>> partitioners;
private UserTaskInvokable userFunction;
private int numberOfInputs;
private int numberOfOutputs;
private static int numTasks = 0;
private String taskInstanceID = "";
private FaultTolerancyBuffer recordBuffer;
private Map<String, StreamRecord> recordBuffer;
public StreamTask() {
// TODO: Make configuration file visible and call setClassInputs() here
......@@ -47,25 +49,60 @@ public class StreamTask extends AbstractTask {
outputs = new LinkedList<RecordWriter<Record>>();
partitioners = new LinkedList<ChannelSelector<Record>>();
userFunction = null;
numberOfInputs = 0;
numberOfOutputs = 0;
numTasks++;
taskInstanceID = Integer.toString(numTasks);
recordBuffer = new TreeMap<String, StreamRecord>();
}
// alternative: remove the comments on these two functions as well as the
// two statements in registerInputOuput functions.
// private void setConfigInputs(Configuration taskConfiguration) {
// numberOfInputs = taskConfiguration.getInteger("numberOfInputs", 0);
// for (int i = 0; i < numberOfInputs; i++) {
// inputs.add(new RecordReader<Record>(this, Record.class));
// }
// }
//
// private void setConfigOutputs(Configuration taskConfiguration) {
// numberOfOutputs = taskConfiguration.getInteger("numberOfOutputs", 0);
// for (int i = 1; i <= numberOfOutputs; i++) {
// StreamComponentFactory.setPartitioner(taskConfiguration, i,
// partitioners);
// }
// for (ChannelSelector<Record> outputPartitioner : partitioners) {
// outputs.add(new RecordWriter<Record>(this, Record.class,
// outputPartitioner));
// }
// }
public void setUserFunction(Configuration taskConfiguration) {
Class<? extends UserTaskInvokable> userFunctionClass = taskConfiguration
.getClass("userfunction", DefaultTaskInvokable.class,
UserTaskInvokable.class);
try {
userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs, taskInstanceID, recordBuffer);
} catch (Exception e) {
}
}
@Override
public void registerInputOutput() {
Configuration taskConfiguration = getTaskConfiguration();
// setConfigInputs(taskConfiguration);
// setConfigOutputs(taskConfiguration);
StreamComponentFactory.setConfigInputs(this, taskConfiguration, inputs);
StreamComponentFactory.setConfigOutputs(this, taskConfiguration, outputs,
partitioners);
numberOfInputs = StreamComponentFactory.setConfigInputs(this,
taskConfiguration, inputs);
numberOfOutputs = StreamComponentFactory.setConfigOutputs(this,
taskConfiguration, outputs, partitioners);
recordBuffer = new FaultTolerancyBuffer(outputs, taskInstanceID);
userFunction = (UserTaskInvokable) StreamComponentFactory.setUserFunction(
taskConfiguration, outputs, taskInstanceID, recordBuffer);
StreamComponentFactory
.setAckListener(recordBuffer, taskInstanceID, outputs);
StreamComponentFactory.setFailListener(recordBuffer, taskInstanceID,
setUserFunction(taskConfiguration);
StreamComponentFactory.setAckListener(recordBuffer, taskInstanceID,
outputs);
}
......@@ -78,20 +115,15 @@ public class StreamTask extends AbstractTask {
if (input.hasNext()) {
hasInput = true;
StreamRecord streamRecord = new StreamRecord(input.next());
String id = streamRecord.getId();
String id = streamRecord.popId();
// TODO: Enclose invoke in try-catch to properly fail
// records
try {
userFunction.invoke(streamRecord.getRecord());
System.out
.println(this.getClass().getName() + "-" + taskInstanceID);
System.out.println(recordBuffer.getRecordBuffer());
System.out.println("---------------------");
input.publishEvent(new AckEvent(id));
} catch (Exception e) {
input.publishEvent(new FailEvent(id));
}
userFunction.invoke(streamRecord.getRecord());
System.out.println(this.getClass().getName() + "-"
+ taskInstanceID);
System.out.println(recordBuffer.toString());
System.out.println("---------------------");
input.publishEvent(new AckEvent(id));
}
}
}
......
package eu.stratosphere.streaming.api.invokable;
import java.util.List;
import java.util.Map;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.types.Record;
......@@ -11,10 +12,10 @@ public abstract class StreamInvokable {
private List<RecordWriter<Record>> outputs;
protected String channelID;
private FaultTolerancyBuffer emittedRecords;
private Map<String, StreamRecord> emittedRecords;
public final void declareOutputs(List<RecordWriter<Record>> outputs,
String channelID, FaultTolerancyBuffer emittedRecords) {
String channelID, Map<String, StreamRecord> emittedRecords) {
this.outputs = outputs;
this.channelID = channelID;
this.emittedRecords = emittedRecords;
......@@ -23,12 +24,12 @@ public abstract class StreamInvokable {
public final void emit(Record record) {
StreamRecord streamRecord = new StreamRecord(record, channelID).addId();
emittedRecords.addRecord(streamRecord);
emittedRecords.put(streamRecord.getId(), streamRecord);
for (RecordWriter<Record> output : outputs) {
try {
output.emit(streamRecord.getRecordWithId());
output.emit(streamRecord.getRecord());
System.out.println(this.getClass().getName());
System.out.println("Emitted " + streamRecord.getId() + "-"
......@@ -37,7 +38,6 @@ public abstract class StreamInvokable {
} catch (Exception e) {
System.out.println("Emit error");
emittedRecords.failRecord(streamRecord.getId());
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册