提交 abf758de 编写于 作者: G Gyula Fora 提交者: Stephan Ewen

[streaming] fault tolerance update

上级 bd81e2c3
......@@ -125,11 +125,13 @@ public class FaultToleranceBuffer {
public void addTimestamp(String recordID) {
Long currentTime = System.currentTimeMillis();
recordTimestamps.put(recordID, currentTime);
if (recordsByTime.containsKey(currentTime)) {
recordsByTime.get(currentTime).add(recordID);
Set<String> recordSet = recordsByTime.get(currentTime);
if (recordSet != null) {
recordSet.add(recordID);
} else {
Set<String> recordSet = new HashSet<String>();
recordSet = new HashSet<String>();
recordSet.add(recordID);
recordsByTime.put(currentTime, recordSet);
}
......@@ -143,9 +145,7 @@ public class FaultToleranceBuffer {
*/
public StreamRecord popRecord(String recordID) {
System.out.println("Pop ID: " + recordID);
StreamRecord record = recordBuffer.get(recordID);
removeRecord(recordID);
return record;
return removeRecord(recordID);
}
/**
......@@ -156,18 +156,18 @@ public class FaultToleranceBuffer {
* The ID of the record that will be removed
*
*/
void removeRecord(String recordID) {
recordBuffer.remove(recordID);
StreamRecord removeRecord(String recordID) {
ackCounter.remove(recordID);
try {
Long ts = recordTimestamps.remove(recordID);
recordsByTime.get(ts).remove(recordID);
recordsByTime.get(recordTimestamps.remove(recordID)).remove(recordID);
} catch (NullPointerException e) {
} catch (Exception e) {
e.printStackTrace();
System.out.println(recordID);
}
return recordBuffer.remove(recordID);
}
/**
......@@ -180,7 +180,7 @@ public class FaultToleranceBuffer {
// TODO: find a place to call timeoutRecords
public void ackRecord(String recordID) {
if (ackCounter.containsKey(recordID)) {
int ackCount = ackCounter.get(recordID) - 1;
Integer ackCount = ackCounter.get(recordID) - 1;
if (ackCount == 0) {
removeRecord(recordID);
......@@ -188,7 +188,6 @@ public class FaultToleranceBuffer {
ackCounter.put(recordID, ackCount);
}
}
// timeoutRecords(System.currentTimeMillis());
}
/**
......
......@@ -39,7 +39,7 @@ public class FaultToleranceBufferTest {
record.addRecord(new StringValue("V1"));
faultTolerancyBuffer.addRecord(record);
assertEquals((Integer) 3, faultTolerancyBuffer.getAckCounter().get(record.getId()));
assertEquals(record,faultTolerancyBuffer.getRecordBuffer().get(record.getId()));
assertArrayEquals(record.getRecord(0),faultTolerancyBuffer.getRecordBuffer().get(record.getId()).getRecord(0));
}
@Test
......@@ -88,7 +88,7 @@ public class FaultToleranceBufferTest {
record1.addRecord(new StringValue("V1"));
faultTolerancyBuffer.addRecord(record1);
assertEquals(record1, faultTolerancyBuffer.popRecord(record1.getId()));
assertArrayEquals(record1.getRecord(0), faultTolerancyBuffer.popRecord(record1.getId()).getRecord(0));
System.out.println("---------");
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册