提交 517eea96 编写于 作者: G Gyula Fora 提交者: Stephan Ewen

[streaming] wordcount bugfix

上级 f350b9da
......@@ -125,11 +125,13 @@ public class FaultToleranceBuffer {
public void addTimestamp(String recordID) {
Long currentTime = System.currentTimeMillis();
recordTimestamps.put(recordID, currentTime);
if (recordsByTime.containsKey(currentTime)) {
recordsByTime.get(currentTime).add(recordID);
Set<String> recordSet = recordsByTime.get(currentTime);
if (recordSet != null) {
recordSet.add(recordID);
} else {
Set<String> recordSet = new HashSet<String>();
recordSet = new HashSet<String>();
recordSet.add(recordID);
recordsByTime.put(currentTime, recordSet);
}
......@@ -143,9 +145,7 @@ public class FaultToleranceBuffer {
*/
public StreamRecord popRecord(String recordID) {
System.out.println("Pop ID: " + recordID);
StreamRecord record = recordBuffer.get(recordID);
removeRecord(recordID);
return record;
return removeRecord(recordID);
}
/**
......@@ -156,18 +156,18 @@ public class FaultToleranceBuffer {
* The ID of the record that will be removed
*
*/
void removeRecord(String recordID) {
recordBuffer.remove(recordID);
StreamRecord removeRecord(String recordID) {
ackCounter.remove(recordID);
try {
Long ts = recordTimestamps.remove(recordID);
recordsByTime.get(ts).remove(recordID);
recordsByTime.get(recordTimestamps.remove(recordID)).remove(recordID);
} catch (NullPointerException e) {
} catch (Exception e) {
e.printStackTrace();
System.out.println(recordID);
}
return recordBuffer.remove(recordID);
}
/**
......@@ -180,7 +180,7 @@ public class FaultToleranceBuffer {
// TODO: find a place to call timeoutRecords
public void ackRecord(String recordID) {
if (ackCounter.containsKey(recordID)) {
int ackCount = ackCounter.get(recordID) - 1;
Integer ackCount = ackCounter.get(recordID) - 1;
if (ackCount == 0) {
removeRecord(recordID);
......@@ -188,7 +188,6 @@ public class FaultToleranceBuffer {
ackCounter.put(recordID, ackCount);
}
}
// timeoutRecords(System.currentTimeMillis());
}
/**
......
......@@ -14,11 +14,10 @@ import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
/**
* Object for storing serializable records in batch(single records are
* Object for storing serializable records in batch (single records are
* represented batches with one element) used for sending records between task
* objects in Stratosphere stream processing. The elements of the batch are
* Value arrays.
*
*/
public class StreamRecord implements IOReadableWritable, Serializable {
private static final long serialVersionUID = 1L;
......@@ -45,7 +44,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Number of fields in the records
*/
public StreamRecord(int length) {
this.numOfFields = length;
numOfFields = length;
recordBatch = new ArrayList<Value[]>();
}
......@@ -59,8 +58,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Number of records
*/
public StreamRecord(int length, int batchSize) {
this.numOfFields = length;
this.numOfRecords = batchSize;
numOfFields = length;
numOfRecords = batchSize;
recordBatch = new ArrayList<Value[]>(batchSize);
}
......@@ -72,14 +71,11 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Array containing the Values for the first record in the batch
*/
public StreamRecord(Value... values) {
numOfFields = values.length;
recordBatch = new ArrayList<Value[]>();
this(values.length, 1);
recordBatch.add(values);
numOfRecords = 1;
}
/**
*
* @return Number of fields in the records
*/
public int getNumOfFields() {
......@@ -87,7 +83,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
*
* @return Number of records in the batch
*/
public int getNumOfRecords() {
......@@ -107,7 +102,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
/**
*
* @return The ID of the object
*/
public String getId() {
......@@ -141,13 +135,57 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @return Value of the field
*/
public Value getField(int fieldNumber) {
return getField(0, fieldNumber);
}
/**
* Sets a field in the given position of a specific record in the batch
*
* @param recordNumber
* Position of record in batch
* @param fieldNumber
* Position of field in record
* @param value
* Value to set
*/
public void setField(int recordNumber, int fieldNumber, Value value) {
try {
recordBatch.get(recordNumber)[fieldNumber] = value;
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
}
/**
* Sets a field in the given position of the first record in the batch
*
* @param fieldNumber
* Position of the field in the record
*/
public void setField(int fieldNumber, Value value) {
setField(0, fieldNumber, value);
}
/**
* @param recordNumber
* Position of the record in the batch
* @return Value array containing the fields of the record
*/
public Value[] getRecord(int recordNumber) {
try {
return recordBatch.get(0)[fieldNumber];
return recordBatch.get(recordNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchRecordException());
}
}
/**
* @return Value array containing the fields of first the record
*/
public Value[] getRecord() {
return getRecord(0);
}
/**
* Sets a record at the given position in the batch
*
......@@ -187,15 +225,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
/**
* @param recordNumber
* Position of the record in the batch
* @return Value array containing the fields of the record
*/
public Value[] getRecord(int recordNumber) {
return recordBatch.get(recordNumber);
}
/**
* Checks if the number of fields are equal to the batch field size then
* adds the Value array to the end of the batch
......@@ -219,6 +248,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*/
public StreamRecord copy() {
StreamRecord copiedRecord = new StreamRecord(this.numOfFields, this.numOfRecords);
copiedRecord.uid = this.uid;
for (Value[] record : recordBatch) {
copiedRecord.recordBatch.add(record);
}
......
......@@ -24,20 +24,20 @@ public abstract class StreamInvokable {
record.setId(channelID);
emittedRecords.addRecord(record);
for (RecordWriter<StreamRecord> output : outputs) {
try {
try {
for (RecordWriter<StreamRecord> output : outputs) {
output.emit(record);
// System.out.println(this.getClass().getName());
// System.out.println("Emitted " + record.getId() + "-"
// + record.toString());
// System.out.println("---------------------");
// System.out.println(this.getClass().getName());
// System.out.println("Emitted " + record.getId() + "-"
// + record.toString());
// System.out.println("---------------------");
} catch (Exception e) {
System.out.println("Emit error: " + e.getMessage());
emittedRecords.failRecord(record.getId());
}
} catch (Exception e) {
System.out.println("Emit error: " + e.getMessage());
emittedRecords.failRecord(record.getId());
}
}
}
\ No newline at end of file
}
......@@ -23,9 +23,9 @@ public class WordCountSplitter extends UserTaskInvokable {
private StringValue sentence = new StringValue();
private String[] words = new String[] {};
private StringValue wordValue = new StringValue();
private StringValue wordValue = new StringValue("");
// private StreamRecord outputRecord = new StreamRecord(1);
private StreamRecord outputRecord = new StreamRecord(wordValue);
@Override
public void invoke(StreamRecord record) throws Exception {
......@@ -34,9 +34,9 @@ public class WordCountSplitter extends UserTaskInvokable {
words = sentence.getValue().split(" ");
for (CharSequence word : words) {
wordValue.setValue(word);
// outputRecord.setRecord(wordValue);
// emit(outputRecord);
emit(new StreamRecord(wordValue));
outputRecord.setRecord(wordValue);
emit(outputRecord);
//emit(new StreamRecord(wordValue));
}
}
}
\ No newline at end of file
......@@ -39,7 +39,7 @@ public class FaultToleranceBufferTest {
record.addRecord(new StringValue("V1"));
faultTolerancyBuffer.addRecord(record);
assertEquals((Integer) 3, faultTolerancyBuffer.getAckCounter().get(record.getId()));
assertEquals(record,faultTolerancyBuffer.getRecordBuffer().get(record.getId()));
assertArrayEquals(record.getRecord(0),faultTolerancyBuffer.getRecordBuffer().get(record.getId()).getRecord(0));
}
@Test
......@@ -88,7 +88,7 @@ public class FaultToleranceBufferTest {
record1.addRecord(new StringValue("V1"));
faultTolerancyBuffer.addRecord(record1);
assertEquals(record1, faultTolerancyBuffer.popRecord(record1.getId()));
assertArrayEquals(record1.getRecord(0), faultTolerancyBuffer.popRecord(record1.getId()).getRecord(0));
System.out.println("---------");
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册