From 038f4f9679801c46da322fa05b2c049040c4a17f Mon Sep 17 00:00:00 2001 From: gaborhermann Date: Mon, 14 Jul 2014 16:28:54 +0200 Subject: [PATCH] [streaming] Added AckEvent --- .../stratosphere/streaming/api/AckEvent.java | 29 +++++++++++++++++++ .../streaming/api/StreamTask.java | 24 ++++++++++++++- 2 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEvent.java diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEvent.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEvent.java new file mode 100644 index 00000000000..a3515a97f15 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/AckEvent.java @@ -0,0 +1,29 @@ +package eu.stratosphere.streaming.api; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import eu.stratosphere.nephele.event.task.AbstractTaskEvent; + +public class AckEvent extends AbstractTaskEvent { + private Long recordId; + + public AckEvent(Long recordId) { + setRecordId(recordId); + System.out.println("created " + recordId); + } + + @Override + public void write(DataOutput out) throws IOException {} + + @Override + public void read(DataInput in) throws IOException {} + + public void setRecordId(Long recordId) { + this.recordId = recordId; + } + public Long getRecordId() { + return this.recordId; + } +} \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java index ed3b7bf33c0..7bf12728819 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamTask.java @@ -15,10 +15,15 @@ package eu.stratosphere.streaming.api; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; import java.util.LinkedList; import java.util.List; import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.nephele.event.task.AbstractTaskEvent; +import eu.stratosphere.nephele.event.task.EventListener; import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.RecordReader; import eu.stratosphere.nephele.io.RecordWriter; @@ -71,8 +76,25 @@ public class StreamTask extends AbstractTask { } setUserFunction(taskConfiguration); + setAckListener(); } + public void setAckListener() { + EventListener eventListener = new EventListener() { + @Override + public void eventOccurred(AbstractTaskEvent event) { + AckEvent ackEvent = (AckEvent) event; + Long recordId = ackEvent.getRecordId(); + System.out.println("acked " + recordId); + //TODO: resend record with the given id + } + }; + for (RecordWriter output : outputs) { + //TODO: separate outputs + output.subscribeToEvent(eventListener, AckEvent.class); + } + } + public void setUserFunction(Configuration taskConfiguration) { Class userFunctionClass = taskConfiguration @@ -129,8 +151,8 @@ public class StreamTask extends AbstractTask { hasInput = true; StreamRecord streamRecord = new StreamRecord(input.next()); Long id = streamRecord.popId(); - userFunction.invoke(streamRecord.getRecord()); + input.publishEvent(new AckEvent(id)); //TODO: ack here } } -- GitLab