diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java index 3f943c1732350ee2c9f752b51fb678d133650219..28a57b682e864fef9be057a71fd67bd5cacf163c 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultTolerancyBuffer.java @@ -26,10 +26,13 @@ import java.util.TreeMap; import eu.stratosphere.nephele.io.RecordWriter; +/** An object to provide fault tolerance for Stratosphere stream processing. + * It works as a buffer to hold StreamRecords for a task for re-emitting failed, or timed out records. + */ public class FaultTolerancyBuffer { private long TIMEOUT = 1000; - + private Long timeOfLastUpdate; private Map recordBuffer; private Map ackCounter; @@ -41,6 +44,13 @@ public class FaultTolerancyBuffer { private int numberOfOutputs; + /**Creates fault tolerance buffer object for the given output channels and channel ID + * + * @param outputs + * List of outputs + * @param channelID + * ID of the task object that uses this buffer + */ public FaultTolerancyBuffer(List> outputs, String channelID) { this.timeOfLastUpdate = System.currentTimeMillis(); @@ -53,6 +63,9 @@ public class FaultTolerancyBuffer { this.recordTimestamps = new HashMap(); } +/** Adds the record to the fault tolerance buffer. This record will be monitored for acknowledgements and timeout. + * + */ public void addRecord(StreamRecord streamRecord) { recordBuffer.put(streamRecord.getId(), streamRecord); @@ -60,6 +73,13 @@ public class FaultTolerancyBuffer { addTimestamp(streamRecord.getId()); } + /** Checks for records that have timed out since the last check and fails them. + * + * @param currentTime + * Time when the check should be made, usually current system time. + * @return + * Returns the list of the records that have timed out. + */ List timeoutRecords(Long currentTime) { if (timeOfLastUpdate + TIMEOUT < currentTime) { List timedOutRecords = new LinkedList(); @@ -85,6 +105,14 @@ public class FaultTolerancyBuffer { return null; } + /**Stores time stamp for a record by recordID + * and also adds the record to a map which maps a time stamp to the IDs of records that were emitted at that time. + *

+ * Later used for timeouts. + * + * @param recordID + * ID of the record + */ public void addTimestamp(String recordID) { Long currentTime = System.currentTimeMillis(); recordTimestamps.put(recordID, currentTime); @@ -98,6 +126,11 @@ public class FaultTolerancyBuffer { } } + /**Returns a StreamRecord after removing it from the buffer + * + * @param recordID + * The ID of the record that will be popped + */ public StreamRecord popRecord(String recordID) { System.out.println("Pop ID: " + recordID); StreamRecord record = recordBuffer.get(recordID); @@ -105,6 +138,11 @@ public class FaultTolerancyBuffer { return record; } + /** Removes a StreamRecord by ID from the fault tolerance buffer, further acks will have no effects for this record. + * @param recordID + * The ID of the record that will be removed + * + */ void removeRecord(String recordID) { recordBuffer.remove(recordID); ackCounter.remove(recordID); @@ -119,6 +157,11 @@ public class FaultTolerancyBuffer { } } + /**Acknowledges the record of the given ID, if all the outputs have sent acknowledgments, removes it from the buffer + * + * @param recordID + * ID of the record that has been acknowledged + */ //TODO: find a place to call timeoutRecords public void ackRecord(String recordID) { if (ackCounter.containsKey(recordID)) { @@ -133,6 +176,11 @@ public class FaultTolerancyBuffer { //timeoutRecords(System.currentTimeMillis()); } + /**Re-emits the failed record for the given ID, removes the old record and stores it with a new ID. + * + * @param recordID + * ID of the record that has been failed + */ public void failRecord(String recordID) { // Create new id to avoid double counting acks System.out.println("Fail ID: " + recordID); @@ -140,7 +188,12 @@ public class FaultTolerancyBuffer { addRecord(newRecord); reEmit(newRecord); } - + +/** + * Emit give record to all output channels + * @param record + * Record to be re-emitted + */ public void reEmit(StreamRecord record) { for (RecordWriter output : outputs) { try {