diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEvent.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEvent.java new file mode 100644 index 0000000000000000000000000000000000000000..a3515a97f15f683a92dc78bf5fda8a628097a7ea --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEvent.java @@ -0,0 +1,29 @@ +package eu.stratosphere.streaming.api; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import eu.stratosphere.nephele.event.task.AbstractTaskEvent; + +public class AckEvent extends AbstractTaskEvent { + private Long recordId; + + public AckEvent(Long recordId) { + setRecordId(recordId); + System.out.println("created " + recordId); + } + + @Override + public void write(DataOutput out) throws IOException {} + + @Override + public void read(DataInput in) throws IOException {} + + public void setRecordId(Long recordId) { + this.recordId = recordId; + } + public Long getRecordId() { + return this.recordId; + } +} \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java index ed3b7bf33c033a72e4d6ee89ccd870cecd9c152d..7bf12728819c5048bdd31545613dff2f11a1dc03 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java @@ -15,10 +15,15 @@ package eu.stratosphere.streaming.api; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.LinkedList; import java.util.List; import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.nephele.event.task.AbstractTaskEvent; +import eu.stratosphere.nephele.event.task.EventListener; import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.RecordReader; import eu.stratosphere.nephele.io.RecordWriter; @@ -71,8 +76,25 @@ public class StreamTask extends AbstractTask { } setUserFunction(taskConfiguration); + setAckListener(); } + public void setAckListener() { + EventListener eventListener = new EventListener() { + @Override + public void eventOccurred(AbstractTaskEvent event) { + AckEvent ackEvent = (AckEvent) event; + Long recordId = ackEvent.getRecordId(); + System.out.println("acked " + recordId); + //TODO: resend record with the given id + } + }; + for (RecordWriter output : outputs) { + //TODO: separate outputs + output.subscribeToEvent(eventListener, AckEvent.class); + } + } + public void setUserFunction(Configuration taskConfiguration) { Class userFunctionClass = taskConfiguration @@ -129,8 +151,8 @@ public class StreamTask extends AbstractTask { hasInput = true; StreamRecord streamRecord = new StreamRecord(input.next()); Long id = streamRecord.popId(); - userFunction.invoke(streamRecord.getRecord()); + input.publishEvent(new AckEvent(id)); //TODO: ack here } }