From 04a717e08c0c13165bbdf52bca41a4d864463db5 Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Mon, 14 Jul 2014 16:28:55 +0200 Subject: [PATCH] [streaming] ack update --- .../java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java | 1 + .../eu/stratosphere/streaming/api/invokable/StreamInvokable.java | 1 + 2 files changed, 2 insertions(+) diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java index e9305c71871..5f381d0936e 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java @@ -16,6 +16,7 @@ public class FaultTolerancyBuffer { private int numberOfOutputs; public FaultTolerancyBuffer(List> outputs) { + this.outputs=outputs; this.recordBuffer = new HashMap(); this.ackCounter = new HashMap(); this.numberOfOutputs = outputs.size(); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/StreamInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/StreamInvokable.java index 93a994dbe74..784796f8365 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/StreamInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/invokable/StreamInvokable.java @@ -39,6 +39,7 @@ public abstract class StreamInvokable { } catch (Exception e) { System.out.println("Emit error"); + emittedRecords.failRecord(streamRecord.getId()); } } } -- GitLab