提交 87fd433f 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] WordCount Refactor

上级 7ca4718e
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import java.io.DataInput;
......@@ -20,18 +5,10 @@ import java.io.DataOutput;
import java.io.IOException;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
/**TaskEvent for sending record acknowledgements to the input's fault tolerance buffer
*
*
*/
public class AckEvent extends AbstractTaskEvent {
private String recordId;
/**Creates a new event to acknowledge the record with the given ID
*
* @param recordId
* ID of the record to be acknowledged
*/
public AckEvent(String recordId) {
setRecordId(recordId);
System.out.println("Ack sent " + recordId);
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
/**
* EventListener for record acknowledgement events. When an AckEvent occurs,
* uses the task's fault tolerance buffer to acknowledge the given record.
*/
public class AckEventListener implements EventListener {
private String taskInstanceID;
private FaultTolerancyBuffer recordBuffer;
private FaultToleranceBuffer recordBuffer;
/**
* Creates an AckEventListener that monitors AckEvents sent to task with the
* given ID.
*
* @param taskInstanceID
* ID of the task that creates the listener
* @param recordBuffer
* The fault tolerance buffer associated with this task
*/
public AckEventListener(String taskInstanceID,
FaultTolerancyBuffer recordBuffer) {
FaultToleranceBuffer recordBuffer) {
this.taskInstanceID = taskInstanceID;
this.recordBuffer = recordBuffer;
}
/**
* When an AckEvent occurs checks if it was directed at this task, if so,
* acknowledges the record given in the AckEvent
*
*/
public void eventOccurred(AbstractTaskEvent event) {
AckEvent ackEvent = (AckEvent) event;
......@@ -55,10 +21,9 @@ public class AckEventListener implements EventListener {
String ackCID = recordId.split("-", 2)[0];
if (ackCID.equals(taskInstanceID)) {
Long nt = System.nanoTime();
Long nt= System.nanoTime();
recordBuffer.ackRecord(ackEvent.getRecordId());
System.out.println("Ack recieved " + ackEvent.getRecordId()
+ "\nAck exec. time(ns): " + (System.nanoTime() - nt));
System.out.println("Ack recieved " + ackEvent.getRecordId()+"\nAck exec. time(ns): "+(System.nanoTime()-nt));
System.out.println("--------------");
}
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import eu.stratosphere.types.Value;
/**
* Basic record object that holds a fixed number of Values (fields)
*
*
*/
public class AtomRecord {
private Value[] record;
/**
* Creates an AtomRecord with one field
*
*/
public AtomRecord() {
record = new Value[1];
public AtomRecord(){
record=new Value[1];
}
/**
* Creates an AtomRecord with the given number of fields
*
* @param length
* Number of fields
*/
public AtomRecord(int length) {
record = new Value[length];
public AtomRecord(int length){
record=new Value[length];
}
/**
* Creates an AtomRecord from a Value array that will hold the same number of
* fields and values
*
* @param fields
* Value array of the desired fields
*/
public AtomRecord(Value[] fields) {
record = fields;
public AtomRecord(Value[] fields){
record=fields;
}
/**
* Creates an AtomRecord that hold one field with the given Value
*
* @param field
* Desired value
*/
public AtomRecord(Value field) {
record = new Value[1];
record[0] = field;
public AtomRecord(Value fields) {
record=new Value[1];
record[0]=fields;
}
/**
*
* @return Value array containing the fields
*/
public Value[] getFields() {
public Value[] getFields(){
return record;
}
/**
*
* @param fieldNumber
* The position of the desired field
* @return Value in that position
*/
public Value getField(int fieldNumber) {
public Value getField(int fieldNumber){
return record[fieldNumber];
}
/**
*
* @param fieldNumber
* The position of the field that will be set
* @param value
* New value of the field
*/
public void setField(int fieldNumber, Value value) {
record[fieldNumber] = value;
public void setField(int fieldNumber, Value value){
record[fieldNumber]=value;
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import java.io.DataInput;
......@@ -21,39 +6,25 @@ import java.io.IOException;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
/**
* TaskEvent for sending record fails to the input's fault tolerance buffer
*
*
*/
public class FailEvent extends AbstractTaskEvent {
private String recordId;
/**
* Creates a new event to fail the record with the given ID
*
* @param recordId
* ID of the record to be acknowledged
*/
public FailEvent(String recordId) {
setRecordId(recordId);
System.out.println("Fail sent " + recordId);
System.out.println("---------------------");
}
@Override
public void write(DataOutput out) throws IOException {
}
public void write(DataOutput out) throws IOException {}
@Override
public void read(DataInput in) throws IOException {
}
public void read(DataInput in) throws IOException {}
public void setRecordId(String recordId) {
this.recordId = recordId;
}
public String getRecordId() {
return this.recordId;
}
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
/**
* EventListener for record fail events. When a FailEvent occurs, uses the
* task's fault tolerance buffer to fail and re-emit the given record.
*/
public class FailEventListener implements EventListener {
private String taskInstanceID;
private FaultTolerancyBuffer recordBuffer;
private FaultToleranceBuffer recordBuffer;
/**
* Creates a FailEventListener that monitors FailEvents sent to task with the
* given ID.
*
* @param taskInstanceID
* ID of the task that creates the listener
* @param recordBuffer
* The fault tolerance buffer associated with this task
*/
public FailEventListener(String taskInstanceID,
FaultTolerancyBuffer recordBuffer) {
FaultToleranceBuffer recordBuffer) {
this.taskInstanceID = taskInstanceID;
this.recordBuffer = recordBuffer;
}
/**
* When a FailEvent occurs checks if it was directed at this task, if so,
* fails the record given in the FailEvent
*
*/
public void eventOccurred(AbstractTaskEvent event) {
FailEvent failEvent = (FailEvent) event;
String recordId = failEvent.getRecordId();
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import eu.stratosphere.nephele.io.RecordWriter;
/**
* An object to provide fault tolerance for Stratosphere stream processing. It
* works as a buffer to hold StreamRecords for a task for re-emitting failed, or
* timed out records.
*/
public class FaultTolerancyBuffer {
private long TIMEOUT = 1000;
private Long timeOfLastUpdate;
private Map<String, StreamRecord> recordBuffer;
private Map<String, Integer> ackCounter;
private SortedMap<Long, Set<String>> recordsByTime;
private Map<String, Long> recordTimestamps;
private List<RecordWriter<StreamRecord>> outputs;
private final String channelID;
private int numberOfOutputs;
/**
* Creates fault tolerance buffer object for the given output channels and
* channel ID
*
* @param outputs
* List of outputs
* @param channelID
* ID of the task object that uses this buffer
*/
public FaultTolerancyBuffer(List<RecordWriter<StreamRecord>> outputs,
String channelID) {
this.timeOfLastUpdate = System.currentTimeMillis();
this.outputs = outputs;
this.recordBuffer = new HashMap<String, StreamRecord>();
this.ackCounter = new HashMap<String, Integer>();
this.numberOfOutputs = outputs.size();
this.channelID = channelID;
this.recordsByTime = new TreeMap<Long, Set<String>>();
this.recordTimestamps = new HashMap<String, Long>();
}
/**
* Adds the record to the fault tolerance buffer. This record will be
* monitored for acknowledgements and timeout.
*
*/
public void addRecord(StreamRecord streamRecord) {
recordBuffer.put(streamRecord.getId(), streamRecord);
ackCounter.put(streamRecord.getId(), numberOfOutputs);
addTimestamp(streamRecord.getId());
}
/**
* Checks for records that have timed out since the last check and fails them.
*
* @param currentTime
* Time when the check should be made, usually current system time.
* @return Returns the list of the records that have timed out.
*/
List<String> timeoutRecords(Long currentTime) {
if (timeOfLastUpdate + TIMEOUT < currentTime) {
List<String> timedOutRecords = new LinkedList<String>();
Map<Long, Set<String>> timedOut = recordsByTime.subMap(0L, currentTime
- TIMEOUT);
for (Set<String> recordSet : timedOut.values()) {
if (!recordSet.isEmpty()) {
for (String recordID : recordSet) {
timedOutRecords.add(recordID);
}
}
}
recordsByTime.keySet().removeAll(timedOut.keySet());
for (String recordID : timedOutRecords) {
failRecord(recordID);
}
timeOfLastUpdate = currentTime;
return timedOutRecords;
}
return null;
}
/**
* Stores time stamp for a record by recordID and also adds the record to a
* map which maps a time stamp to the IDs of records that were emitted at that
* time.
* <p>
* Later used for timeouts.
*
* @param recordID
* ID of the record
*/
public void addTimestamp(String recordID) {
Long currentTime = System.currentTimeMillis();
recordTimestamps.put(recordID, currentTime);
if (recordsByTime.containsKey(currentTime)) {
recordsByTime.get(currentTime).add(recordID);
} else {
Set<String> recordSet = new HashSet<String>();
recordSet.add(recordID);
recordsByTime.put(currentTime, recordSet);
}
}
/**
* Returns a StreamRecord after removing it from the buffer
*
* @param recordID
* The ID of the record that will be popped
*/
public StreamRecord popRecord(String recordID) {
System.out.println("Pop ID: " + recordID);
StreamRecord record = recordBuffer.get(recordID);
removeRecord(recordID);
return record;
}
/**
* Removes a StreamRecord by ID from the fault tolerance buffer, further acks
* will have no effects for this record.
*
* @param recordID
* The ID of the record that will be removed
*
*/
void removeRecord(String recordID) {
recordBuffer.remove(recordID);
ackCounter.remove(recordID);
try {
Long ts = recordTimestamps.remove(recordID);
recordsByTime.get(ts).remove(recordID);
} catch (NullPointerException e) {
} catch (Exception e) {
e.printStackTrace();
System.out.println(recordID);
}
}
/**
* Acknowledges the record of the given ID, if all the outputs have sent
* acknowledgments, removes it from the buffer
*
* @param recordID
* ID of the record that has been acknowledged
*/
// TODO: find a place to call timeoutRecords
public void ackRecord(String recordID) {
if (ackCounter.containsKey(recordID)) {
int ackCount = ackCounter.get(recordID) - 1;
if (ackCount == 0) {
removeRecord(recordID);
} else {
ackCounter.put(recordID, ackCount);
}
}
// timeoutRecords(System.currentTimeMillis());
}
/**
* Re-emits the failed record for the given ID, removes the old record and
* stores it with a new ID.
*
* @param recordID
* ID of the record that has been failed
*/
public void failRecord(String recordID) {
// Create new id to avoid double counting acks
System.out.println("Fail ID: " + recordID);
StreamRecord newRecord = popRecord(recordID).setId(channelID);
addRecord(newRecord);
reEmit(newRecord);
}
/**
* Emit give record to all output channels
*
* @param record
* Record to be re-emitted
*/
public void reEmit(StreamRecord record) {
for (RecordWriter<StreamRecord> output : outputs) {
try {
output.emit(record);
System.out.println("Re-emitted");
} catch (Exception e) {
System.out.println("Re-emit failed");
}
}
}
public long getTIMEOUT() {
return this.TIMEOUT;
}
public void setTIMEOUT(long TIMEOUT) {
this.TIMEOUT = TIMEOUT;
}
public Map<String, StreamRecord> getRecordBuffer() {
return this.recordBuffer;
}
public Long getTimeOfLastUpdate() {
return this.timeOfLastUpdate;
}
public Map<String, Integer> getAckCounter() {
return this.ackCounter;
}
public SortedMap<Long, Set<String>> getRecordsByTime() {
return this.recordsByTime;
}
public Map<String, Long> getRecordTimestamps() {
return this.recordTimestamps;
}
public List<RecordWriter<StreamRecord>> getOutputs() {
return this.outputs;
}
public String getChannelID() {
return this.channelID;
}
public int getNumberOfOutputs() {
return this.numberOfOutputs;
}
void setNumberOfOutputs(int numberOfOutputs) {
this.numberOfOutputs = numberOfOutputs;
}
}
......@@ -13,13 +13,6 @@ import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
/**
* Object for storing serializable records in batch(single records are
* represented batches with one element) used for sending records between task
* objects in Stratosphere stream processing. The elements of the batch are
* Value arrays.
*
*/
public class StreamRecord implements IOReadableWritable, Serializable {
private static final long serialVersionUID = 1L;
......@@ -28,31 +21,16 @@ public class StreamRecord implements IOReadableWritable, Serializable {
private int numOfFields;
private int numOfRecords;
/**
* Creates a new empty batch of records and sets the field number to one
*/
public StreamRecord() {
this.numOfFields = 1;
recordBatch = new ArrayList<Value[]>();
}
/**
* Creates a new empty batch of records and sets the field number to the given
* number
*
* @param length
* Number of fields in the records
*/
public StreamRecord(int length) {
this.numOfFields = length;
recordBatch = new ArrayList<Value[]>();
}
/**
* Create a new batch of records with one element from an AtomRecord, sets
* number of fields accordingly
*
*/
public StreamRecord(AtomRecord record) {
Value[] fields = record.getFields();
numOfFields = fields.length;
......@@ -60,89 +38,37 @@ public class StreamRecord implements IOReadableWritable, Serializable {
recordBatch.add(fields);
numOfRecords = recordBatch.size();
}
/**
* Given an array of Values, creates a new a record batch containing the array
* as its first element
*
* @param values
* Array containing the Values for the first record in the batch
*/
public StreamRecord(Value... values) {
numOfFields = values.length;
recordBatch = new ArrayList<Value[]>();
recordBatch.add(values);
numOfRecords = recordBatch.size();
public StreamRecord(Value... values){
this(new AtomRecord(values));
}
/**
*
* @return Number of fields in the records
*/
public int getNumOfFields() {
return numOfFields;
}
/**
*
* @return Number of records in the batch
*/
public int getNumOfRecords() {
return numOfRecords;
}
/**
* Set the ID of the StreamRecord object
*
* @param channelID
* ID of the emitting task
* @return The StreamRecord object
*/
public StreamRecord setId(String channelID) {
Random rnd = new Random();
uid.setValue(channelID + "-" + rnd.nextInt(1000));
return this;
}
/**
*
* @return The ID of the object
*/
public String getId() {
return uid.getValue();
}
/**
* Returns the Value of a field in the given position of a specific record in
* the batch
*
* @param recordNumber
* Position of the record in the batch
* @param fieldNumber
* Position of the field in the record
* @return Value of the field
*/
public Value getField(int recordNumber, int fieldNumber) {
return recordBatch.get(recordNumber)[fieldNumber];
}
/**
*
* @param recordNumber
* Position of the record in the batch
* @return AtomRecord object containing the fields of the record
*/
public AtomRecord getRecord(int recordNumber) {
return new AtomRecord(recordBatch.get(recordNumber));
}
/**
* Checks if the number of fields are equal to the batch field size then adds
* the AtomRecord to the end of the batch
*
* @param record
* Record to be added
*/
public void addRecord(AtomRecord record) {
Value[] fields = record.getFields();
if (fields.length == numOfFields) {
......@@ -151,20 +77,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
}
}
/**
* Checks if the number of fields are equal to the batch field size then adds
* the Value Arrray to the end of the batch
*
* @param record
* Value array to be added as the next record of the batch
*/
public void addRecord(Value[] fields) {
if (fields.length == numOfFields) {
recordBatch.add(fields);
numOfRecords = recordBatch.size();
}
}
@Override
public void write(DataOutput out) throws IOException {
uid.write(out);
......
......@@ -3,7 +3,7 @@ package eu.stratosphere.streaming.api.invokable;
import java.util.List;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.FaultToleranceBuffer;
import eu.stratosphere.streaming.api.StreamRecord;
public abstract class StreamInvokable {
......@@ -11,10 +11,10 @@ public abstract class StreamInvokable {
private List<RecordWriter<StreamRecord>> outputs;
protected String channelID;
private FaultTolerancyBuffer emittedRecords;
private FaultToleranceBuffer emittedRecords;
public final void declareOutputs(List<RecordWriter<StreamRecord>> outputs,
String channelID, FaultTolerancyBuffer emittedRecords) {
String channelID, FaultToleranceBuffer emittedRecords) {
this.outputs = outputs;
this.channelID = channelID;
this.emittedRecords = emittedRecords;
......
......@@ -15,7 +15,7 @@ import eu.stratosphere.streaming.api.AckEvent;
import eu.stratosphere.streaming.api.AckEventListener;
import eu.stratosphere.streaming.api.FailEvent;
import eu.stratosphere.streaming.api.FailEventListener;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.FaultToleranceBuffer;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
......@@ -28,7 +28,7 @@ import eu.stratosphere.types.StringValue;
public final class StreamComponentHelper<T extends AbstractInvokable> {
public void setAckListener(FaultTolerancyBuffer recordBuffer,
public void setAckListener(FaultToleranceBuffer recordBuffer,
String sourceInstanceID, List<RecordWriter<StreamRecord>> outputs) {
EventListener eventListener = new AckEventListener(sourceInstanceID,
recordBuffer);
......@@ -38,7 +38,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
}
}
public void setFailListener(FaultTolerancyBuffer recordBuffer,
public void setFailListener(FaultToleranceBuffer recordBuffer,
String sourceInstanceID, List<RecordWriter<StreamRecord>> outputs) {
EventListener eventListener = new FailEventListener(sourceInstanceID,
recordBuffer);
......@@ -101,7 +101,7 @@ public final class StreamComponentHelper<T extends AbstractInvokable> {
public StreamInvokable getUserFunction(Configuration taskConfiguration,
List<RecordWriter<StreamRecord>> outputs, String instanceID,
FaultTolerancyBuffer recordBuffer) {
FaultToleranceBuffer recordBuffer) {
// Default value is a TaskInvokable even if it was called from a source
Class<? extends StreamInvokable> userFunctionClass = taskConfiguration
......
......@@ -22,7 +22,7 @@ import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.FaultToleranceBuffer;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.test.RandIS;
......@@ -34,7 +34,7 @@ public class StreamSource extends AbstractInputTask<RandIS> {
private UserSourceInvokable userFunction;
private static int numSources = 0;
private String sourceInstanceID;
private FaultTolerancyBuffer recordBuffer;
private FaultToleranceBuffer recordBuffer;
public StreamSource() {
// TODO: Make configuration file visible and call setClassInputs() here
......@@ -68,7 +68,7 @@ public class StreamSource extends AbstractInputTask<RandIS> {
e.printStackTrace();
}
recordBuffer = new FaultTolerancyBuffer(outputs, sourceInstanceID);
recordBuffer = new FaultToleranceBuffer(outputs, sourceInstanceID);
userFunction = (UserSourceInvokable) streamSourceHelper.getUserFunction(
taskConfiguration, outputs, sourceInstanceID, recordBuffer);
streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID, outputs);
......
......@@ -25,7 +25,7 @@ import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.api.AckEvent;
import eu.stratosphere.streaming.api.FailEvent;
import eu.stratosphere.streaming.api.FaultTolerancyBuffer;
import eu.stratosphere.streaming.api.FaultToleranceBuffer;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
......@@ -39,7 +39,7 @@ public class StreamTask extends AbstractTask {
private String taskInstanceID = "";
StreamComponentHelper<StreamTask> streamTaskHelper;
private FaultTolerancyBuffer recordBuffer;
private FaultToleranceBuffer recordBuffer;
public StreamTask() {
// TODO: Make configuration file visible and call setClassInputs() here
......@@ -64,7 +64,7 @@ public class StreamTask extends AbstractTask {
e.printStackTrace();
}
recordBuffer = new FaultTolerancyBuffer(outputs, taskInstanceID);
recordBuffer = new FaultToleranceBuffer(outputs, taskInstanceID);
userFunction = (UserTaskInvokable) streamTaskHelper.getUserFunction(
taskConfiguration, outputs, taskInstanceID, recordBuffer);
streamTaskHelper.setAckListener(recordBuffer, taskInstanceID, outputs);
......
......@@ -19,7 +19,7 @@ import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.StringValue;
public class MyBatchStreamSink implements UserSinkInvokable {
public class BachForwardSink implements UserSinkInvokable {
private StringValue word = new StringValue("");
......
......@@ -19,13 +19,13 @@ import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.test.util.TestBase2;
public class MyBatchStream extends TestBase2{
public class BatchForward extends TestBase2{
@Override
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("StreamSource", MyBatchStreamSource.class);
graphBuilder.setSink("StreamSink", MyBatchStreamSink.class);
graphBuilder.setSource("StreamSource", BatchForwardSource.class);
graphBuilder.setSink("StreamSink", BachForwardSink.class);
graphBuilder.broadcastConnect("StreamSource", "StreamSink");
......
......@@ -15,22 +15,17 @@
package eu.stratosphere.streaming.test.batch;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.StringValue;
public class MyBatchStreamSource extends UserSourceInvokable {
// private final String motto = "Stratosphere Big Data looks tiny from here";
private final String motto = "Gyuszi Gabor Big Marci Gyuszi";
private final AtomRecord record=new AtomRecord();
private final StreamRecord mottoRecord=new StreamRecord();
public class BatchForwardSource extends UserSourceInvokable {
private final StringValue motto = new StringValue("Stratosphere Big Data looks tiny from here");
private final StreamRecord mottoRecord = new StreamRecord(motto);
@Override
public void invoke() throws Exception {
record.setField(0, new StringValue(motto));
mottoRecord.addRecord(record);
for (int i = 0; i < 100; i++) {
for (int i = 0; i < 10; i++) {
emit(mottoRecord);
}
}
......
......@@ -13,25 +13,22 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test;
package eu.stratosphere.streaming.test.cellinfo;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.DefaultSourceInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.test.util.TestBase2;
import eu.stratosphere.types.StringValue;
public class MyStream extends TestBase2 {
public class CellInfo extends TestBase2 {
@Override
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("infoSource", DefaultSourceInvokable.class);
graphBuilder.setSource("querySource", DefaultSourceInvokable.class);
graphBuilder.setTask("cellTask", DefaultTaskInvokable.class, 2);
graphBuilder.setSink("sink", DefaultSinkInvokable.class);
graphBuilder.setSource("infoSource", InfoSourceInvokable.class);
graphBuilder.setSource("querySource", QuerySourceInvokable.class);
graphBuilder.setTask("cellTask", CellTaskInvokable.class, 2);
graphBuilder.setSink("sink", CellSinkInvokable.class);
graphBuilder.fieldsConnect("infoSource", "cellTask", 0, StringValue.class);
graphBuilder.fieldsConnect("querySource", "cellTask",0, StringValue.class);
......
......@@ -13,13 +13,13 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test;
package eu.stratosphere.streaming.test.cellinfo;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.StringValue;
public class TestSinkInvokable implements UserSinkInvokable {
public class CellSinkInvokable implements UserSinkInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
......
......@@ -13,17 +13,15 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test;
package eu.stratosphere.streaming.test.cellinfo;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.test.cellinfo.WorkerEngineExact;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
public class TestTaskInvokable extends UserTaskInvokable {
public class CellTaskInvokable extends UserTaskInvokable {
private WorkerEngineExact engine = new WorkerEngineExact(10, 1000, 0);
......@@ -35,15 +33,13 @@ public class TestTaskInvokable extends UserTaskInvokable {
// INFO
if (record.getNumOfFields() == 2) {
engine.put(value1.getValue(), value2.getValue());
emit(new StreamRecord(new AtomRecord(new StringValue(value1 + " "
+ value2))));
emit(new StreamRecord(new StringValue(value1 + " " + value2)));
}
// QUERY
else if (record.getNumOfFields() == 3) {
LongValue value3 = (LongValue) record.getField(0, 2);
emit(new StreamRecord(new AtomRecord(new StringValue(
String.valueOf(engine.get(value2.getValue(),
value3.getValue(), value1.getValue()))))));
emit(new StreamRecord(new StringValue(String.valueOf(engine.get(
value2.getValue(), value3.getValue(), value1.getValue())))));
}
}
}
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test;
package eu.stratosphere.streaming.test.cellinfo;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
......@@ -21,7 +21,7 @@ import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
public class TestSourceInvokable extends UserSourceInvokable {
public class InfoSourceInvokable extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
......
......@@ -13,7 +13,7 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test;
package eu.stratosphere.streaming.test.cellinfo;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
......
......@@ -27,15 +27,20 @@ public class WindowWordCount extends TestBase2 {
@Override
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WindowWordCountSource", WindowWordCountSource.class);
graphBuilder.setTask("WindowWordCountSplitter", WindowWordCountSplitter.class, 1);
graphBuilder.setTask("WindowWordCountCounter", WindowWordCountCounter.class, 1);
graphBuilder.setSource("WindowWordCountSource",
WindowWordCountSource.class);
graphBuilder.setTask("WindowWordCountSplitter",
WindowWordCountSplitter.class, 1);
graphBuilder.setTask("WindowWordCountCounter",
WindowWordCountCounter.class, 1);
graphBuilder.setSink("WindowWordCountSink", WindowWordCountSink.class);
graphBuilder.broadcastConnect("WindowWordCountSource", "WindowWordCountSplitter");
graphBuilder.fieldsConnect("WindowWordCountSplitter", "WindowWordCountCounter", 0,
StringValue.class);
graphBuilder.broadcastConnect("WindowWordCountCounter", "WindowWordCountSink");
graphBuilder.broadcastConnect("WindowWordCountSource",
"WindowWordCountSplitter");
graphBuilder.fieldsConnect("WindowWordCountSplitter",
"WindowWordCountCounter", 0, StringValue.class);
graphBuilder.broadcastConnect("WindowWordCountCounter",
"WindowWordCountSink");
return graphBuilder.getJobGraph();
}
......
......@@ -26,10 +26,10 @@ import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
public class WindowWordCountCounter extends UserTaskInvokable {
private int windowSize = 100;
private int slidingStep = 20;
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private StringValue wordValue = new StringValue("");
private IntValue countValue = new IntValue(1);
......@@ -40,8 +40,8 @@ public class WindowWordCountCounter extends UserTaskInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
wordValue=(StringValue) record.getField(0, 0);
timestamp=(LongValue) record.getField(0, 1);
wordValue = (StringValue) record.getField(0, 0);
timestamp = (LongValue) record.getField(0, 1);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
......
......@@ -18,7 +18,6 @@ package eu.stratosphere.streaming.test.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.IntValue;
......@@ -30,14 +29,13 @@ public class WordCountCounter extends UserTaskInvokable {
private StringValue wordValue = new StringValue("");
private IntValue countValue = new IntValue(1);
private String word = "";
private AtomRecord outputRecord = new AtomRecord(2);
private int count = 1;
@Override
public void invoke(StreamRecord record) throws Exception {
wordValue = (StringValue) record.getRecord(0).getField(0);
word = wordValue.getValue();
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
......@@ -46,8 +44,7 @@ public class WordCountCounter extends UserTaskInvokable {
wordCounts.put(word, 1);
countValue.setValue(1);
}
outputRecord.setField(0, wordValue);
outputRecord.setField(1, countValue);
emit(new StreamRecord(outputRecord));
// TODO: object reuse
emit(new StreamRecord(wordValue, countValue));
}
}
......@@ -23,7 +23,7 @@ import eu.stratosphere.types.StringValue;
public class WordCountSink implements UserSinkInvokable {
private StringValue word = new StringValue("");
private IntValue count = new IntValue(1);
private IntValue count = new IntValue();
@Override
public void invoke(StreamRecord record) throws Exception {
......
......@@ -15,23 +15,43 @@
package eu.stratosphere.streaming.test.wordcount;
import eu.stratosphere.streaming.api.AtomRecord;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
public class WordCountSource extends UserSourceInvokable {
// private final String motto =
// "Stratosphere Big Data looks tiny from here";
private final String motto = "Gyuszi Gabor Big Marci Gyuszi";
private StreamRecord mottoRecord;
private BufferedReader br = null;
private String line = new String();
private StringValue lineValue = new StringValue();
private Value[] hamletValues = new StringValue[1];
private StreamRecord hamletRecord = new StreamRecord(1);
public WordCountSource() {
try {
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/hamlet.txt"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
@Override
public void invoke() throws Exception {
mottoRecord = new StreamRecord(new AtomRecord(new StringValue(motto)));
for (int i = 0; i < 10000; i++) {
emit(mottoRecord);
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
while (line != null) {
if (line != "") {
lineValue.setValue(line);
hamletValues[0] = lineValue;
// TODO: use hamletRecord instead
emit(new StreamRecord(hamletValues));
}
line = br.readLine();
}
}
}
\ No newline at end of file
......@@ -15,7 +15,6 @@
package eu.stratosphere.streaming.test.wordcount;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.StringValue;
......@@ -25,18 +24,15 @@ public class WordCountSplitter extends UserTaskInvokable {
private StringValue sentence = new StringValue("");
private String[] words = new String[0];
private StringValue wordValue = new StringValue("");
private AtomRecord outputRecord = new AtomRecord(wordValue);
@Override
public void invoke(StreamRecord record) throws Exception {
//record.getFieldInto(0, sentence);
sentence = (StringValue) record.getRecord(0).getField(0);
System.out.println("to split: " + sentence.getValue());
words = sentence.getValue().split(" ");
for (CharSequence word : words) {
wordValue.setValue(word);
outputRecord.setField(0, wordValue);
emit(new StreamRecord(outputRecord));
emit(new StreamRecord(wordValue));
}
}
}
\ No newline at end of file
package eu.stratosphere.streaming.api;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.LinkedList;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.types.StringValue;
public class FaultTolerancyBufferTest {
FaultTolerancyBuffer faultTolerancyBuffer;
List<RecordWriter<StreamRecord>> outputs;
@Before
public void setFaultTolerancyBuffer() {
outputs = new LinkedList<RecordWriter<StreamRecord>>();
faultTolerancyBuffer = new FaultTolerancyBuffer(outputs, "1");
faultTolerancyBuffer.setNumberOfOutputs(3);
}
@Test
public void testFaultTolerancyBuffer() {
assertEquals(3, faultTolerancyBuffer.getNumberOfOutputs());
assertEquals(outputs, faultTolerancyBuffer.getOutputs());
assertEquals("1", faultTolerancyBuffer.getChannelID());
}
@Test
public void testAddRecord() {
StreamRecord record = (new StreamRecord(1)).setId("1");
record.addRecord(new AtomRecord(new StringValue("V1")));
faultTolerancyBuffer.addRecord(record);
assertEquals((Integer) 3, faultTolerancyBuffer.getAckCounter().get(record.getId()));
assertEquals(record,faultTolerancyBuffer.getRecordBuffer().get(record.getId()));
}
@Test
public void testAddTimestamp() {
Long cTime = System.currentTimeMillis();
faultTolerancyBuffer.addTimestamp("1-1337");
Long recordTimeStamp = faultTolerancyBuffer.getRecordTimestamps().get(
"1-1337");
assertTrue(recordTimeStamp - cTime < 2);
String[] records = new String[] { "1-1337" };
assertArrayEquals(records,
faultTolerancyBuffer.getRecordsByTime().get(recordTimeStamp).toArray());
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
faultTolerancyBuffer.addTimestamp("1-1338");
faultTolerancyBuffer.addTimestamp("1-1339");
long recordTimeStamp1 = faultTolerancyBuffer.getRecordTimestamps().get(
"1-1338");
long recordTimeStamp2 = faultTolerancyBuffer.getRecordTimestamps().get(
"1-1339");
records = new String[] { "1-1338","1-1339"};
if(recordTimeStamp1==recordTimeStamp2){
assertTrue(faultTolerancyBuffer.getRecordsByTime().get(recordTimeStamp1).contains("1-1338"));
assertTrue(faultTolerancyBuffer.getRecordsByTime().get(recordTimeStamp1).contains("1-1339"));
assertTrue(faultTolerancyBuffer.getRecordsByTime().get(recordTimeStamp1).size()==2);
}
}
@Test
public void testPopRecord() {
StreamRecord record1 = (new StreamRecord(1)).setId("1");
record1.addRecord(new AtomRecord(new StringValue("V1")));
faultTolerancyBuffer.addRecord(record1);
assertEquals(record1, faultTolerancyBuffer.popRecord(record1.getId()));
System.out.println("---------");
}
@Test
public void testRemoveRecord() {
StreamRecord record1 = (new StreamRecord(1)).setId("1");
record1.addRecord(new AtomRecord(new StringValue("V1")));
StreamRecord record2 = (new StreamRecord(1)).setId("1");
record2.addRecord(new AtomRecord(new StringValue("V2")));
faultTolerancyBuffer.addRecord(record1);
faultTolerancyBuffer.addRecord(record2);
Long record1TS=faultTolerancyBuffer.getRecordTimestamps().get(record1.getId());
Long record2TS=faultTolerancyBuffer.getRecordTimestamps().get(record2.getId());
faultTolerancyBuffer.removeRecord(record1.getId());
assertTrue(faultTolerancyBuffer.getRecordBuffer().containsKey(record2.getId()));
assertTrue(faultTolerancyBuffer.getAckCounter().containsKey(record2.getId()));
assertTrue(faultTolerancyBuffer.getRecordTimestamps().containsKey(record2.getId()));
assertTrue(faultTolerancyBuffer.getRecordsByTime().get(record2TS).contains(record2.getId()));
assertFalse(faultTolerancyBuffer.getRecordBuffer().containsKey(record1.getId()));
assertFalse(faultTolerancyBuffer.getAckCounter().containsKey(record1.getId()));
assertFalse(faultTolerancyBuffer.getRecordTimestamps().containsKey(record1.getId()));
assertFalse(faultTolerancyBuffer.getRecordsByTime().get(record1TS).contains(record1.getId()));
}
@Test
public void testAckRecord() {
StreamRecord record1 = (new StreamRecord(1)).setId("1");
record1.addRecord(new AtomRecord(new StringValue("V1")));
faultTolerancyBuffer.addRecord(record1);
Long record1TS=faultTolerancyBuffer.getRecordTimestamps().get(record1.getId());
faultTolerancyBuffer.ackRecord(record1.getId());
faultTolerancyBuffer.ackRecord(record1.getId());
assertEquals((Integer) 1, faultTolerancyBuffer.getAckCounter().get(record1.getId()));
assertTrue(faultTolerancyBuffer.getRecordBuffer().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getAckCounter().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getRecordTimestamps().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getRecordsByTime().get(record1TS).contains(record1.getId()));
faultTolerancyBuffer.ackRecord(record1.getId());
assertFalse(faultTolerancyBuffer.getRecordBuffer().containsKey(record1.getId()));
assertFalse(faultTolerancyBuffer.getAckCounter().containsKey(record1.getId()));
assertFalse(faultTolerancyBuffer.getRecordTimestamps().containsKey(record1.getId()));
assertFalse(faultTolerancyBuffer.getRecordsByTime().get(record1TS).contains(record1.getId()));
faultTolerancyBuffer.ackRecord(record1.getId());
}
@Test
public void testFailRecord() {
StreamRecord record1 = (new StreamRecord(1)).setId("1");
record1.addRecord(new AtomRecord(new StringValue("V1")));
faultTolerancyBuffer.addRecord(record1);
Long record1TS=faultTolerancyBuffer.getRecordTimestamps().get(record1.getId());
assertTrue(faultTolerancyBuffer.getRecordBuffer().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getAckCounter().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getRecordTimestamps().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getRecordsByTime().get(record1TS).contains(record1.getId()));
String prevID = record1.getId();
faultTolerancyBuffer.failRecord(record1.getId());
Long record2TS=faultTolerancyBuffer.getRecordTimestamps().get(record1.getId());
assertFalse(faultTolerancyBuffer.getRecordBuffer().containsKey(prevID));
assertFalse(faultTolerancyBuffer.getAckCounter().containsKey(prevID));
assertFalse(faultTolerancyBuffer.getRecordTimestamps().containsKey(prevID));
assertFalse(faultTolerancyBuffer.getRecordsByTime().get(record1TS).contains(prevID));
faultTolerancyBuffer.ackRecord(prevID);
faultTolerancyBuffer.ackRecord(prevID);
faultTolerancyBuffer.ackRecord(prevID);
assertTrue(faultTolerancyBuffer.getRecordBuffer().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getAckCounter().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getRecordTimestamps().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getRecordsByTime().get(record2TS).contains(record1.getId()));
System.out.println("---------");
}
//TODO: create more tests for this method
@Test
public void testTimeOutRecords() {
faultTolerancyBuffer.setTIMEOUT(1000);
StreamRecord record1 = (new StreamRecord(1)).setId("1");
record1.addRecord(new AtomRecord(new StringValue("V1")));
StreamRecord record2 = (new StreamRecord(1)).setId("1");
record2.addRecord(new AtomRecord(new StringValue("V2")));
StreamRecord record3 = (new StreamRecord(1)).setId("1");
record3.addRecord(new AtomRecord(new StringValue("V3")));
faultTolerancyBuffer.addRecord(record1);
faultTolerancyBuffer.addRecord(record2);
try {
Thread.sleep(500);
} catch (Exception e) {
}
faultTolerancyBuffer.addRecord(record3);
Long record1TS=faultTolerancyBuffer.getRecordTimestamps().get(record1.getId());
Long record2TS=faultTolerancyBuffer.getRecordTimestamps().get(record2.getId());
faultTolerancyBuffer.ackRecord(record1.getId());
faultTolerancyBuffer.ackRecord(record1.getId());
faultTolerancyBuffer.ackRecord(record1.getId());
faultTolerancyBuffer.ackRecord(record2.getId());
faultTolerancyBuffer.ackRecord(record3.getId());
faultTolerancyBuffer.ackRecord(record3.getId());
try {
Thread.sleep(501);
} catch (InterruptedException e) {
}
List<String> timedOutRecords = faultTolerancyBuffer.timeoutRecords(System.currentTimeMillis());
System.out.println("timedOutRecords: "+ timedOutRecords);
assertEquals(1, timedOutRecords.size());
assertFalse(timedOutRecords.contains(record1.getId()));
assertFalse(faultTolerancyBuffer.getRecordsByTime().containsKey(record1TS));
assertFalse(faultTolerancyBuffer.getRecordsByTime().containsKey(record2TS));
assertTrue(faultTolerancyBuffer.getRecordBuffer().containsKey(record2.getId()));
assertTrue(faultTolerancyBuffer.getAckCounter().containsKey(record2.getId()));
assertTrue(faultTolerancyBuffer.getRecordTimestamps().containsKey(record2.getId()));
System.out.println(faultTolerancyBuffer.getAckCounter());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
timedOutRecords = faultTolerancyBuffer.timeoutRecords(System.currentTimeMillis());
assertEquals(null,timedOutRecords);
try {
Thread.sleep(901);
} catch (InterruptedException e) {
}
timedOutRecords = faultTolerancyBuffer.timeoutRecords(System.currentTimeMillis());
System.out.println("timedOutRecords: "+ timedOutRecords);
assertEquals(2, timedOutRecords.size());
System.out.println(faultTolerancyBuffer.getAckCounter());
System.out.println("---------");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册