diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultToleranceBuffer.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultToleranceBuffer.java index b7de58fdd31c1f5e189277694d0f902012056e41..3905fa7b1a62d416a5f68f996d6140abee64b02b 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultToleranceBuffer.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/FaultToleranceBuffer.java @@ -125,11 +125,13 @@ public class FaultToleranceBuffer { public void addTimestamp(String recordID) { Long currentTime = System.currentTimeMillis(); recordTimestamps.put(recordID, currentTime); - - if (recordsByTime.containsKey(currentTime)) { - recordsByTime.get(currentTime).add(recordID); + + Set recordSet = recordsByTime.get(currentTime); + + if (recordSet != null) { + recordSet.add(recordID); } else { - Set recordSet = new HashSet(); + recordSet = new HashSet(); recordSet.add(recordID); recordsByTime.put(currentTime, recordSet); } @@ -143,9 +145,7 @@ public class FaultToleranceBuffer { */ public StreamRecord popRecord(String recordID) { System.out.println("Pop ID: " + recordID); - StreamRecord record = recordBuffer.get(recordID); - removeRecord(recordID); - return record; + return removeRecord(recordID); } /** @@ -156,18 +156,18 @@ public class FaultToleranceBuffer { * The ID of the record that will be removed * */ - void removeRecord(String recordID) { - recordBuffer.remove(recordID); + StreamRecord removeRecord(String recordID) { + ackCounter.remove(recordID); try { - Long ts = recordTimestamps.remove(recordID); - recordsByTime.get(ts).remove(recordID); + recordsByTime.get(recordTimestamps.remove(recordID)).remove(recordID); } catch (NullPointerException e) { } catch (Exception e) { e.printStackTrace(); System.out.println(recordID); } + return recordBuffer.remove(recordID); } /** @@ -180,7 +180,7 @@ public class FaultToleranceBuffer { // TODO: find a place to call timeoutRecords public void ackRecord(String recordID) { if (ackCounter.containsKey(recordID)) { - int ackCount = ackCounter.get(recordID) - 1; + Integer ackCount = ackCounter.get(recordID) - 1; if (ackCount == 0) { removeRecord(recordID); @@ -188,7 +188,6 @@ public class FaultToleranceBuffer { ackCounter.put(recordID, ackCount); } } - // timeoutRecords(System.currentTimeMillis()); } /** diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamRecord.java index 5d0e7663b49f8738323472d89414b4fcb3b4ac55..776f9d85851a2778279d4aafd51226b64126e186 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamRecord.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/StreamRecord.java @@ -14,11 +14,10 @@ import eu.stratosphere.types.StringValue; import eu.stratosphere.types.Value; /** - * Object for storing serializable records in batch(single records are + * Object for storing serializable records in batch (single records are * represented batches with one element) used for sending records between task * objects in Stratosphere stream processing. The elements of the batch are * Value arrays. - * */ public class StreamRecord implements IOReadableWritable, Serializable { private static final long serialVersionUID = 1L; @@ -45,7 +44,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * Number of fields in the records */ public StreamRecord(int length) { - this.numOfFields = length; + numOfFields = length; recordBatch = new ArrayList(); } @@ -59,8 +58,8 @@ public class StreamRecord implements IOReadableWritable, Serializable { * Number of records */ public StreamRecord(int length, int batchSize) { - this.numOfFields = length; - this.numOfRecords = batchSize; + numOfFields = length; + numOfRecords = batchSize; recordBatch = new ArrayList(batchSize); } @@ -72,14 +71,11 @@ public class StreamRecord implements IOReadableWritable, Serializable { * Array containing the Values for the first record in the batch */ public StreamRecord(Value... values) { - numOfFields = values.length; - recordBatch = new ArrayList(); + this(values.length, 1); recordBatch.add(values); - numOfRecords = 1; } /** - * * @return Number of fields in the records */ public int getNumOfFields() { @@ -87,7 +83,6 @@ public class StreamRecord implements IOReadableWritable, Serializable { } /** - * * @return Number of records in the batch */ public int getNumOfRecords() { @@ -107,7 +102,6 @@ public class StreamRecord implements IOReadableWritable, Serializable { } /** - * * @return The ID of the object */ public String getId() { @@ -141,13 +135,57 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @return Value of the field */ public Value getField(int fieldNumber) { + return getField(0, fieldNumber); + } + + /** + * Sets a field in the given position of a specific record in the batch + * + * @param recordNumber + * Position of record in batch + * @param fieldNumber + * Position of field in record + * @param value + * Value to set + */ + public void setField(int recordNumber, int fieldNumber, Value value) { + try { + recordBatch.get(recordNumber)[fieldNumber] = value; + } catch (IndexOutOfBoundsException e) { + throw (new NoSuchRecordException()); + } + } + + /** + * Sets a field in the given position of the first record in the batch + * + * @param fieldNumber + * Position of the field in the record + */ + public void setField(int fieldNumber, Value value) { + setField(0, fieldNumber, value); + } + + /** + * @param recordNumber + * Position of the record in the batch + * @return Value array containing the fields of the record + */ + public Value[] getRecord(int recordNumber) { try { - return recordBatch.get(0)[fieldNumber]; + return recordBatch.get(recordNumber); } catch (IndexOutOfBoundsException e) { throw (new NoSuchRecordException()); } } + /** + * @return Value array containing the fields of first the record + */ + public Value[] getRecord() { + return getRecord(0); + } + /** * Sets a record at the given position in the batch * @@ -187,15 +225,6 @@ public class StreamRecord implements IOReadableWritable, Serializable { } } - /** - * @param recordNumber - * Position of the record in the batch - * @return Value array containing the fields of the record - */ - public Value[] getRecord(int recordNumber) { - return recordBatch.get(recordNumber); - } - /** * Checks if the number of fields are equal to the batch field size then * adds the Value array to the end of the batch @@ -219,6 +248,8 @@ public class StreamRecord implements IOReadableWritable, Serializable { */ public StreamRecord copy() { StreamRecord copiedRecord = new StreamRecord(this.numOfFields, this.numOfRecords); + copiedRecord.uid = this.uid; + for (Value[] record : recordBatch) { copiedRecord.recordBatch.add(record); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamInvokable.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamInvokable.java index 6f91adee24b622d15676cccdae9c145d5ea9aef8..ee4032fdfcbdb7f33fcec15211f88fb581b82d2f 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamInvokable.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamcomponent/StreamInvokable.java @@ -24,20 +24,20 @@ public abstract class StreamInvokable { record.setId(channelID); emittedRecords.addRecord(record); - - for (RecordWriter output : outputs) { - try { + try { + for (RecordWriter output : outputs) { + output.emit(record); -// System.out.println(this.getClass().getName()); -// System.out.println("Emitted " + record.getId() + "-" -// + record.toString()); -// System.out.println("---------------------"); + // System.out.println(this.getClass().getName()); + // System.out.println("Emitted " + record.getId() + "-" + // + record.toString()); + // System.out.println("---------------------"); - } catch (Exception e) { - System.out.println("Emit error: " + e.getMessage()); - emittedRecords.failRecord(record.getId()); } + } catch (Exception e) { + System.out.println("Emit error: " + e.getMessage()); + emittedRecords.failRecord(record.getId()); } } -} \ No newline at end of file +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSplitter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSplitter.java index 4be0db8999dad40eadf27061c9ef5a914c46a807..00918312b07efdfd8cd6d3c8523243ddbe548133 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSplitter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/test/wordcount/WordCountSplitter.java @@ -23,9 +23,9 @@ public class WordCountSplitter extends UserTaskInvokable { private StringValue sentence = new StringValue(); private String[] words = new String[] {}; - private StringValue wordValue = new StringValue(); + private StringValue wordValue = new StringValue(""); - // private StreamRecord outputRecord = new StreamRecord(1); + private StreamRecord outputRecord = new StreamRecord(wordValue); @Override public void invoke(StreamRecord record) throws Exception { @@ -34,9 +34,9 @@ public class WordCountSplitter extends UserTaskInvokable { words = sentence.getValue().split(" "); for (CharSequence word : words) { wordValue.setValue(word); - // outputRecord.setRecord(wordValue); - // emit(outputRecord); - emit(new StreamRecord(wordValue)); + outputRecord.setRecord(wordValue); + emit(outputRecord); + //emit(new StreamRecord(wordValue)); } } } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FaultToleranceBufferTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FaultToleranceBufferTest.java index 82fbe910195ab3146408d08cf390d4b74ed09469..205132066cac8bb265fff9b77c66466b28d17d32 100644 --- a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FaultToleranceBufferTest.java +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/api/FaultToleranceBufferTest.java @@ -39,7 +39,7 @@ public class FaultToleranceBufferTest { record.addRecord(new StringValue("V1")); faultTolerancyBuffer.addRecord(record); assertEquals((Integer) 3, faultTolerancyBuffer.getAckCounter().get(record.getId())); - assertEquals(record,faultTolerancyBuffer.getRecordBuffer().get(record.getId())); + assertArrayEquals(record.getRecord(0),faultTolerancyBuffer.getRecordBuffer().get(record.getId()).getRecord(0)); } @Test @@ -88,7 +88,7 @@ public class FaultToleranceBufferTest { record1.addRecord(new StringValue("V1")); faultTolerancyBuffer.addRecord(record1); - assertEquals(record1, faultTolerancyBuffer.popRecord(record1.getId())); + assertArrayEquals(record1.getRecord(0), faultTolerancyBuffer.popRecord(record1.getId()).getRecord(0)); System.out.println("---------"); }