提交 038f4f96 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] Added AckEvent

上级 d5f2d7fa
package eu.stratosphere.streaming.api;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
public class AckEvent extends AbstractTaskEvent {
private Long recordId;
public AckEvent(Long recordId) {
setRecordId(recordId);
System.out.println("created " + recordId);
}
@Override
public void write(DataOutput out) throws IOException {}
@Override
public void read(DataInput in) throws IOException {}
public void setRecordId(Long recordId) {
this.recordId = recordId;
}
public Long getRecordId() {
return this.recordId;
}
}
\ No newline at end of file
......@@ -15,10 +15,15 @@
package eu.stratosphere.streaming.api;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
......@@ -71,8 +76,25 @@ public class StreamTask extends AbstractTask {
}
setUserFunction(taskConfiguration);
setAckListener();
}
public void setAckListener() {
EventListener eventListener = new EventListener() {
@Override
public void eventOccurred(AbstractTaskEvent event) {
AckEvent ackEvent = (AckEvent) event;
Long recordId = ackEvent.getRecordId();
System.out.println("acked " + recordId);
//TODO: resend record with the given id
}
};
for (RecordWriter output : outputs) {
//TODO: separate outputs
output.subscribeToEvent(eventListener, AckEvent.class);
}
}
public void setUserFunction(Configuration taskConfiguration) {
Class<? extends UserTaskInvokable> userFunctionClass = taskConfiguration
......@@ -129,8 +151,8 @@ public class StreamTask extends AbstractTask {
hasInput = true;
StreamRecord streamRecord = new StreamRecord(input.next());
Long id = streamRecord.popId();
userFunction.invoke(streamRecord.getRecord());
input.publishEvent(new AckEvent(id));
//TODO: ack here
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册