提交 84f7a459 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] ExactlyOnceBufferTest added

上级 393cf18c
......@@ -28,9 +28,9 @@ import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
import eu.stratosphere.streaming.test.RandIS;
import eu.stratosphere.streaming.test.DummyIS;
public class StreamSource extends AbstractInputTask<RandIS> {
public class StreamSource extends AbstractInputTask<DummyIS> {
private static final Log log = LogFactory.getLog(StreamSource.class);
......@@ -54,12 +54,12 @@ public class StreamSource extends AbstractInputTask<RandIS> {
}
@Override
public RandIS[] computeInputSplits(int requestedMinNumber) throws Exception {
public DummyIS[] computeInputSplits(int requestedMinNumber) throws Exception {
return null;
}
@Override
public Class<RandIS> getInputSplitType() {
public Class<DummyIS> getInputSplitType() {
return null;
}
......
......@@ -47,6 +47,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
private int numOfRecords;
// private Random rnd = new Random();
//TODO implement equals, clone
/**
* Creates a new empty batch of records and sets the field number to one
*/
......
......@@ -23,7 +23,7 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class ExactlyOnceFaultToleranceBuffer extends FaultToleranceBuffer {
private Map<String, int[]> ackCounter;
protected Map<String, int[]> ackCounter;
int[] initialAckCounts;
public ExactlyOnceFaultToleranceBuffer(int[] numberOfChannels, String componentInstanceID) {
......@@ -41,7 +41,7 @@ public class ExactlyOnceFaultToleranceBuffer extends FaultToleranceBuffer {
}
private void addToAckCounter(String id, int channel) {
protected void addToAckCounter(String id, int channel) {
int[] acks=new int[numberOfEffectiveChannels.length + 1];
acks[0]=numberOfEffectiveChannels.length-1;
acks[channel+1]=numberOfEffectiveChannels[channel];
......@@ -65,7 +65,7 @@ public class ExactlyOnceFaultToleranceBuffer extends FaultToleranceBuffer {
}
}
private boolean decreaseAckCounter(int[] acks, int channel) {
protected boolean decreaseAckCounter(int[] acks, int channel) {
acks[channel + 1]--;
if (acks[channel + 1] == 0) {
......@@ -79,15 +79,9 @@ public class ExactlyOnceFaultToleranceBuffer extends FaultToleranceBuffer {
protected StreamRecord failChannel(String id, int channel) {
if(notAcked(id, channel)){
int[] acks = ackCounter.get(id);
acks[channel + 1] = 0;
acks[0]++;
if(acks[0]==numberOfEffectiveChannels.length){
remove(id);
}
return addToChannel(id, channel);
} else{
......@@ -96,7 +90,7 @@ public class ExactlyOnceFaultToleranceBuffer extends FaultToleranceBuffer {
}
private StreamRecord addToChannel(String id, int channel) {
protected StreamRecord addToChannel(String id, int channel) {
StreamRecord record = recordBuffer.get(id).copy().setId(componentInstanceID);
......@@ -108,7 +102,7 @@ public class ExactlyOnceFaultToleranceBuffer extends FaultToleranceBuffer {
return record;
}
private boolean notAcked(String id, int channel) {
protected boolean notAcked(String id, int channel) {
int[] acks = ackCounter.get(id);
if (acks != null) {
if(acks[channel+1]>0){
......
......@@ -29,7 +29,8 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public abstract class FaultToleranceBuffer {
private static final Log log = LogFactory.getLog(FaultToleranceBuffer.class);
private static final Log log = LogFactory
.getLog(FaultToleranceBuffer.class);
protected Map<String, StreamRecord> recordBuffer;
protected Map<String, Long> recordTimestamps;
......@@ -47,7 +48,7 @@ public abstract class FaultToleranceBuffer {
totalNumberOfEffectiveChannels += i;
}
this.componentInstanceID = componentInstanceID;
this.componentInstanceID=componentInstanceID;
this.timeOfLastUpdate = System.currentTimeMillis();
this.recordBuffer = new HashMap<String, StreamRecord>();
......@@ -65,24 +66,35 @@ public abstract class FaultToleranceBuffer {
addToAckCounter(id);
// ackCounter.put(id, numberOfChannels);
// TODO: remove comments for exactly once processing
// int[] ackCounts = new int[numberOfChannels + 1];
//
// for (int i = 0; i < numberOfOutputChannels.length; i++) {
// ackCounts[i + 1] = numberOfOutputChannels[i];
// }
//
// ackMap.put(id, ackCounts);
log.trace("Record added to buffer: " + id);
}
protected abstract void addToAckCounter(String id);
protected abstract boolean removeFromAckCounter(String id);
protected abstract void ack(String id, int channel);
//TODO:count fails
protected StreamRecord fail(String id) {
StreamRecord newRecord = remove(id).setId(componentInstanceID);
add(newRecord);
return newRecord;
}
protected abstract StreamRecord failChannel(String id, int channel);
protected void addTimestamp(String id) {
Long currentTime = System.currentTimeMillis();
recordTimestamps.put(id, currentTime);
......
......@@ -21,23 +21,18 @@ import java.io.IOException;
import eu.stratosphere.core.io.InputSplit;
public class RandIS implements InputSplit {
public class DummyIS implements InputSplit {
@Override
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
}
@Override
public void read(DataInput in) throws IOException {
// TODO Auto-generated method stub
}
@Override
public int getSplitNumber() {
// TODO Auto-generated method stub
return 0;
}
......
......@@ -84,6 +84,12 @@ public class StreamRecordTest {
b.setRecord(new StringValue("Data"));
assertFalse(((StringValue) a.getField(0)).getValue().equals(((StringValue) b.getField(0)).getValue()));
}
@Test
public void cloneTest() {
StringValue sv = new StringValue("V1");
StreamRecord a = new StreamRecord(sv);
}
@Test
public void exceptionTest() {
......
......@@ -69,103 +69,47 @@ public class AtLeastOnceBufferTest {
@Test
public void testAck() {
StreamRecord record1 = new StreamRecord(new StringValue("R1")).setId("1");
String id = record1.getId();
buffer.add(record1);
assertEquals((Integer) 3, buffer.ackCounter.get(id));
buffer.ack(id, 1);
assertEquals((Integer) 2, buffer.ackCounter.get(id));
buffer.ack(id, 1);
assertEquals((Integer) 1, buffer.ackCounter.get(id));
buffer.ack(id, 1);
assertFalse(buffer.ackCounter.containsKey(id));
assertFalse(buffer.recordBuffer.containsKey(id));
assertFalse(buffer.recordTimestamps.containsKey(id));
// fail("Not yet implemented");
}
@Test
public void testFailChannel() {
// fail("Not yet implemented");
}
@Test
public void testAtLeastOnceFaultToleranceBuffer() {
numberOfChannels = new int[] { 2, 2, 2 };
buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, "2");
assertArrayEquals(numberOfChannels, buffer.numberOfEffectiveChannels);
assertEquals("2", buffer.componentInstanceID);
assertEquals(6, buffer.totalNumberOfEffectiveChannels);
// fail("Not yet implemented");
}
@Test
public void testFaultToleranceBuffer() {
// fail("Not yet implemented");
}
@Test
public void testAdd() {
StreamRecord record1 = new StreamRecord(new StringValue("R1")).setId("1");
String id1 = record1.getId();
Long nt = System.nanoTime();
buffer.add(record1);
System.out.println("ADD - " + " exec. time (ns): " + (System.nanoTime() - nt));
record1.setRecord(new StringValue("R2"));
record1.setRecord(new StringValue("R1"));
record1.setId("1");
String id2 = record1.getId();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
buffer.add(record1);
assertEquals((Integer) 3, buffer.ackCounter.get(id1));
assertEquals((Integer) 3, buffer.ackCounter.get(id2));
assertEquals(new StringValue("R1"), buffer.recordBuffer.get(id1).getField(0));
assertEquals(id1, buffer.recordBuffer.get(id1).getId());
assertEquals(new StringValue("R2"), buffer.recordBuffer.get(id2).getField(0));
assertEquals(id2, buffer.recordBuffer.get(id2).getId());
assertEquals(2, buffer.recordTimestamps.size());
assertEquals(2, buffer.recordsByTime.size());
assertEquals(2, buffer.recordBuffer.size());
assertEquals(2, buffer.ackCounter.size());
}
@Test
public void testFail() {
StreamRecord record1 = new StreamRecord(new StringValue("R1")).setId("1");
String id1 = record1.getId();
buffer.add(record1);
buffer.ack(id1, 1);
buffer.ack(id1, 1);
assertEquals(1, buffer.recordBuffer.size());
assertEquals(1, buffer.recordTimestamps.size());
assertEquals(1, buffer.ackCounter.size());
StreamRecord failed = buffer.fail(id1);
String id2 = failed.getId();
assertFalse(buffer.ackCounter.containsKey(id1));
assertFalse(buffer.recordBuffer.containsKey(id1));
assertFalse(buffer.recordTimestamps.containsKey(id1));
assertTrue(buffer.ackCounter.containsKey(id2));
assertTrue(buffer.recordBuffer.containsKey(id2));
assertTrue(buffer.recordTimestamps.containsKey(id2));
assertEquals(1, buffer.recordBuffer.size());
assertEquals(1, buffer.recordTimestamps.size());
assertEquals(1, buffer.ackCounter.size());
// fail("Not yet implemented");
}
@Test
......@@ -184,44 +128,7 @@ public class AtLeastOnceBufferTest {
@Test
public void testRemove() {
StreamRecord record1 = new StreamRecord(new StringValue("R1")).setId("1");
String id1 = record1.getId();
buffer.add(record1);
record1.setRecord(new StringValue("R2"));
record1.setId("1");
String id2 = record1.getId();
buffer.add(record1);
assertTrue(buffer.ackCounter.containsKey(id1));
assertTrue(buffer.recordBuffer.containsKey(id1));
assertTrue(buffer.recordTimestamps.containsKey(id1));
assertTrue(buffer.ackCounter.containsKey(id2));
assertTrue(buffer.recordBuffer.containsKey(id2));
assertTrue(buffer.recordTimestamps.containsKey(id2));
assertEquals(2, buffer.recordBuffer.size());
assertEquals(2, buffer.recordTimestamps.size());
assertEquals(2, buffer.ackCounter.size());
StreamRecord removed = buffer.remove(id1);
assertEquals(new StringValue("R1"), removed.getField(0));
assertEquals(id1, removed.getId());
assertFalse(buffer.ackCounter.containsKey(id1));
assertFalse(buffer.recordBuffer.containsKey(id1));
assertFalse(buffer.recordTimestamps.containsKey(id1));
assertTrue(buffer.ackCounter.containsKey(id2));
assertTrue(buffer.recordBuffer.containsKey(id2));
assertTrue(buffer.recordTimestamps.containsKey(id2));
assertEquals(1, buffer.recordBuffer.size());
assertEquals(1, buffer.recordTimestamps.size());
assertEquals(1, buffer.ackCounter.size());
// fail("Not yet implemented");
}
}
......@@ -20,60 +20,131 @@ import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.Test;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.types.StringValue;
public class ExactlyOnceBufferTest {
ExactlyOnceFaultToleranceBuffer buffer;
int[] numberOfChannels;
@Before
public void setUp() throws Exception {
numberOfChannels = new int[] { 1, 2, 2 };
buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, "1");
}
@Test
public void testAddToAckCounter() {
// fail("Not yet implemented");
StreamRecord record1 = new StreamRecord(new StringValue("R1")).setId("1");
buffer.addToAckCounter(record1.getId());
assertArrayEquals(new int[] { 0, 1, 2, 2 }, buffer.ackCounter.get(record1.getId()));
StreamRecord record2 = new StreamRecord(new StringValue("R2")).setId("1");
buffer.addToAckCounter(record2.getId(), 1);
int[] acks = buffer.ackCounter.get(record2.getId());
assertArrayEquals(new int[] { 2, 0, 2, 0 }, acks);
StreamRecord record3 = new StreamRecord(new StringValue("R3")).setId("1");
buffer.addToAckCounter(record3.getId(), 0);
acks = buffer.ackCounter.get(record3.getId());
assertArrayEquals(new int[] { 2, 1, 0, 0 }, acks);
}
@Test
public void testRemoveFromAckCounter() {
// fail("Not yet implemented");
StreamRecord record1 = new StreamRecord(new StringValue("R1")).setId("1");
StreamRecord record2 = new StreamRecord(new StringValue("R2")).setId("1");
buffer.addToAckCounter(record1.getId());
buffer.addToAckCounter(record2.getId());
assertEquals(2, buffer.ackCounter.size());
buffer.removeFromAckCounter(record2.getId());
assertEquals(1, buffer.ackCounter.size());
assertArrayEquals(new int[] { 0, 1, 2, 2 }, buffer.ackCounter.get(record1.getId()));
assertFalse(buffer.ackCounter.containsKey(record2.getId()));
StreamRecord record3 = new StreamRecord(new StringValue("R3")).setId("1");
StreamRecord record4 = new StreamRecord(new StringValue("R4")).setId("1");
buffer.addToAckCounter(record3.getId(), 0);
buffer.addToAckCounter(record4.getId(), 2);
assertEquals(3, buffer.ackCounter.size());
buffer.removeFromAckCounter(record3.getId());
assertEquals(2, buffer.ackCounter.size());
assertTrue(buffer.ackCounter.containsKey(record4.getId()));
assertFalse(buffer.ackCounter.containsKey(record3.getId()));
}
@Test
public void testAck() {
// fail("Not yet implemented");
StreamRecord record1 = new StreamRecord(new StringValue("R1")).setId("1");
String id = record1.getId();
buffer.add(record1);
assertArrayEquals(new int[] { 0, 1, 2, 2 }, buffer.ackCounter.get(id));
buffer.ack(id, 0);
assertArrayEquals(new int[] { 1, 0, 2, 2 }, buffer.ackCounter.get(id));
buffer.ack(id, 1);
assertArrayEquals(new int[] { 1, 0, 1, 2 }, buffer.ackCounter.get(id));
buffer.ack(id, 1);
assertArrayEquals(new int[] { 2, 0, 0, 2 }, buffer.ackCounter.get(id));
buffer.ack(id, 2);
buffer.ack(id, 2);
assertFalse(buffer.ackCounter.containsKey(id));
}
@Test
public void testFailChannel() {
// fail("Not yet implemented");
StreamRecord record1 = new StreamRecord(new StringValue("R1")).setId("1");
String id = record1.getId();
buffer.add(record1);
assertArrayEquals(new int[] { 0, 1, 2, 2 }, buffer.ackCounter.get(id));
StreamRecord failedRecord = buffer.failChannel(id, 1);
assertArrayEquals(new int[] { 1, 1, 0, 2 }, buffer.ackCounter.get(id));
assertArrayEquals(new int[] { 2, 0, 2, 0 }, buffer.ackCounter.get(failedRecord.getId()));
assertEquals(2, buffer.ackCounter.size());
buffer.ack(id, 1);
assertArrayEquals(new int[] { 1, 1, -1, 2 }, buffer.ackCounter.get(id));
assertEquals(2, buffer.ackCounter.size());
assertEquals(null, buffer.failChannel(id, 1));
assertArrayEquals(new int[] { 1, 1, -1, 2 }, buffer.ackCounter.get(id));
assertArrayEquals(new int[] { 2, 0, 2, 0 }, buffer.ackCounter.get(failedRecord.getId()));
assertEquals(2, buffer.ackCounter.size());
buffer.failChannel(failedRecord.getId(), 1);
assertFalse(buffer.ackCounter.containsKey(failedRecord.getId()));
}
@Test
public void testExactlyOnceFaultToleranceBuffer() {
// fail("Not yet implemented");
// fail("Not yet implemented");
}
@Test
public void testFaultToleranceBuffer() {
// fail("Not yet implemented");
// fail("Not yet implemented");
}
@Test
public void testAdd() {
// fail("Not yet implemented");
// fail("Not yet implemented");
}
@Test
public void testFail() {
// fail("Not yet implemented");
}
@Test
public void testAddTimestamp() {
// fail("Not yet implemented");
// fail("Not yet implemented");
}
@Test
public void testRemove() {
// fail("Not yet implemented");
// fail("Not yet implemented");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册