提交 f350b9da 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] Logging & WordCount bug

上级 abf758de
......@@ -125,13 +125,11 @@ public class FaultToleranceBuffer {
public void addTimestamp(String recordID) {
Long currentTime = System.currentTimeMillis();
recordTimestamps.put(recordID, currentTime);
Set<String> recordSet = recordsByTime.get(currentTime);
if (recordSet != null) {
recordSet.add(recordID);
if (recordsByTime.containsKey(currentTime)) {
recordsByTime.get(currentTime).add(recordID);
} else {
recordSet = new HashSet<String>();
Set<String> recordSet = new HashSet<String>();
recordSet.add(recordID);
recordsByTime.put(currentTime, recordSet);
}
......@@ -145,7 +143,9 @@ public class FaultToleranceBuffer {
*/
public StreamRecord popRecord(String recordID) {
System.out.println("Pop ID: " + recordID);
return removeRecord(recordID);
StreamRecord record = recordBuffer.get(recordID);
removeRecord(recordID);
return record;
}
/**
......@@ -156,18 +156,18 @@ public class FaultToleranceBuffer {
* The ID of the record that will be removed
*
*/
StreamRecord removeRecord(String recordID) {
void removeRecord(String recordID) {
recordBuffer.remove(recordID);
ackCounter.remove(recordID);
try {
recordsByTime.get(recordTimestamps.remove(recordID)).remove(recordID);
Long ts = recordTimestamps.remove(recordID);
recordsByTime.get(ts).remove(recordID);
} catch (NullPointerException e) {
} catch (Exception e) {
e.printStackTrace();
System.out.println(recordID);
}
return recordBuffer.remove(recordID);
}
/**
......@@ -180,7 +180,7 @@ public class FaultToleranceBuffer {
// TODO: find a place to call timeoutRecords
public void ackRecord(String recordID) {
if (ackCounter.containsKey(recordID)) {
Integer ackCount = ackCounter.get(recordID) - 1;
int ackCount = ackCounter.get(recordID) - 1;
if (ackCount == 0) {
removeRecord(recordID);
......@@ -188,6 +188,7 @@ public class FaultToleranceBuffer {
ackCounter.put(recordID, ackCount);
}
}
// timeoutRecords(System.currentTimeMillis());
}
/**
......
......@@ -14,10 +14,11 @@ import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
/**
* Object for storing serializable records in batch (single records are
* Object for storing serializable records in batch(single records are
* represented batches with one element) used for sending records between task
* objects in Stratosphere stream processing. The elements of the batch are
* Value arrays.
*
*/
public class StreamRecord implements IOReadableWritable, Serializable {
private static final long serialVersionUID = 1L;
......@@ -44,7 +45,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Number of fields in the records
*/
public StreamRecord(int length) {
numOfFields = length;
this.numOfFields = length;
recordBatch = new ArrayList<Value[]>();
}
......@@ -58,8 +59,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Number of records
*/
public StreamRecord(int length, int batchSize) {
numOfFields = length;
numOfRecords = batchSize;
this.numOfFields = length;
this.numOfRecords = batchSize;
recordBatch = new ArrayList<Value[]>(batchSize);
}
......@@ -71,11 +72,14 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Array containing the Values for the first record in the batch
*/
public StreamRecord(Value... values) {
this(values.length, 1);
numOfFields = values.length;
recordBatch = new ArrayList<Value[]>();
recordBatch.add(values);
numOfRecords = 1;
}
/**
*
* @return Number of fields in the records
*/
public int getNumOfFields() {
......@@ -83,6 +87,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
*
* @return Number of records in the batch
*/
public int getNumOfRecords() {
......@@ -102,6 +107,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
*
* @return The ID of the object
*/
public String getId() {
......@@ -135,57 +141,13 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @return Value of the field
*/
public Value getField(int fieldNumber) {
return getField(0, fieldNumber);
}
/**
* Sets a field in the given position of a specific record in the batch
*
* @param recordNumber
* Position of record in batch
* @param fieldNumber
* Position of field in record
* @param value
* Value to set
*/
public void setField(int recordNumber, int fieldNumber, Value value) {
try {
recordBatch.get(recordNumber)[fieldNumber] = value;
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
}
/**
* Sets a field in the given position of the first record in the batch
*
* @param fieldNumber
* Position of the field in the record
*/
public void setField(int fieldNumber, Value value) {
setField(0, fieldNumber, value);
}
/**
* @param recordNumber
* Position of the record in the batch
* @return Value array containing the fields of the record
*/
public Value[] getRecord(int recordNumber) {
try {
return recordBatch.get(recordNumber);
return recordBatch.get(0)[fieldNumber];
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
}
/**
* @return Value array containing the fields of first the record
*/
public Value[] getRecord() {
return getRecord(0);
}
/**
* Sets a record at the given position in the batch
*
......@@ -225,6 +187,15 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
/**
* @param recordNumber
* Position of the record in the batch
* @return Value array containing the fields of the record
*/
public Value[] getRecord(int recordNumber) {
return recordBatch.get(recordNumber);
}
/**
* Checks if the number of fields are equal to the batch field size then
* adds the Value array to the end of the batch
......@@ -248,8 +219,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public StreamRecord copy() {
StreamRecord copiedRecord = new StreamRecord(this.numOfFields, this.numOfRecords);
copiedRecord.uid = this.uid;
for (Value[] record : recordBatch) {
copiedRecord.recordBatch.add(record);
}
......
......@@ -39,7 +39,7 @@ public class WordCount extends TestBase2 {
root.setLevel(Level.DEBUG);
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountDummySource.class);
graphBuilder.setSource("WordCountSource", WordCountSource.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
......
......@@ -25,6 +25,8 @@ public class WordCountSplitter extends UserTaskInvokable {
private String[] words = new String[] {};
private StringValue wordValue = new StringValue();
// private StreamRecord outputRecord = new StreamRecord(1);
@Override
public void invoke(StreamRecord record) throws Exception {
sentence = (StringValue) record.getRecord(0)[0];
......@@ -32,6 +34,8 @@ public class WordCountSplitter extends UserTaskInvokable {
words = sentence.getValue().split(" ");
for (CharSequence word : words) {
wordValue.setValue(word);
// outputRecord.setRecord(wordValue);
// emit(outputRecord);
emit(new StreamRecord(wordValue));
}
}
......
......@@ -39,7 +39,7 @@ public class FaultToleranceBufferTest {
record.addRecord(new StringValue("V1"));
faultTolerancyBuffer.addRecord(record);
assertEquals((Integer) 3, faultTolerancyBuffer.getAckCounter().get(record.getId()));
assertArrayEquals(record.getRecord(0),faultTolerancyBuffer.getRecordBuffer().get(record.getId()).getRecord(0));
assertEquals(record,faultTolerancyBuffer.getRecordBuffer().get(record.getId()));
}
@Test
......@@ -88,7 +88,7 @@ public class FaultToleranceBufferTest {
record1.addRecord(new StringValue("V1"));
faultTolerancyBuffer.addRecord(record1);
assertArrayEquals(record1.getRecord(0), faultTolerancyBuffer.popRecord(record1.getId()).getRecord(0));
assertEquals(record1, faultTolerancyBuffer.popRecord(record1.getId()));
System.out.println("---------");
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册