diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java index e9305c71871136d2adecc1ccebdf6fd674e3c45a..5f381d0936e387c6f7ffb136c5c300f7d50a157f 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java @@ -16,6 +16,7 @@ public class FaultTolerancyBuffer { private int numberOfOutputs; public FaultTolerancyBuffer(List> outputs) { + this.outputs=outputs; this.recordBuffer = new HashMap(); this.ackCounter = new HashMap(); this.numberOfOutputs = outputs.size(); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/StreamInvokable.java index 93a994dbe741b95124065a26600e10d9aa96cfc2..784796f8365d606f53cd709d2f8baded316a3b2d 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/StreamInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/StreamInvokable.java @@ -39,6 +39,7 @@ public abstract class StreamInvokable { } catch (Exception e) { System.out.println("Emit error"); + emittedRecords.failRecord(streamRecord.getId()); } } }