提交 61531e99 编写于 作者: Y Yingjun Wu 提交者: Stephan Ewen

[streaming] enable batch sending

上级 ba48c65b
package eu.stratosphere.streaming.api;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
public class AtomRecord {
private Value[] record;
public AtomRecord(){
record=new Value[1];
}
public AtomRecord(int length){
record=new Value[length];
}
public AtomRecord(Value[] fields){
record=fields;
}
public AtomRecord(Value fields) {
record=new Value[1];
record[0]=fields;
}
public Value[] getFields(){
return record;
}
public Value getField(int fieldNumber){
return record[fieldNumber];
}
public void setField(int fieldNumber, Value value){
record[fieldNumber]=value;
}
}
......@@ -61,14 +61,14 @@ public class FaultTolerancyBuffer {
}
// TODO: use this method!
List<String> timeoutRecords(Long currentTime) {
private void timeoutRecords() {
Long currentTime = System.currentTimeMillis();
if (timeOfLastUpdate + TIMEOUT < currentTime) {
List<String> timedOutRecords = new LinkedList<String>();
Map<Long, Set<String>> timedOut = recordsByTime.subMap(0L, currentTime
- TIMEOUT);
for (Set<String> recordSet : timedOut.values()) {
if (!recordSet.isEmpty()) {
for (String recordID : recordSet) {
......@@ -81,12 +81,7 @@ public class FaultTolerancyBuffer {
for (String recordID : timedOutRecords) {
failRecord(recordID);
}
timeOfLastUpdate = currentTime;
return timedOutRecords;
}
return null;
}
public void addTimestamp(String recordID) {
......@@ -100,6 +95,7 @@ public class FaultTolerancyBuffer {
recordSet.add(recordID);
recordsByTime.put(currentTime, recordSet);
}
// System.out.println(currentTime.toString()+" : "+recordsByTime.get(currentTime).toString());
}
public StreamRecord popRecord(String recordID) {
......@@ -109,17 +105,15 @@ public class FaultTolerancyBuffer {
return record;
}
void removeRecord(String recordID) {
private void removeRecord(String recordID) {
recordBuffer.remove(recordID);
ackCounter.remove(recordID);
try {
Long ts = recordTimestamps.remove(recordID);
recordsByTime.get(ts).remove(recordID);
} catch (NullPointerException e) {
}
catch (Exception e) {
e.printStackTrace();
} catch (Exception e) {
System.out.println(e.getMessage());
System.out.println(recordID);
}
}
......@@ -142,7 +136,6 @@ public class FaultTolerancyBuffer {
// Create new id to avoid double counting acks
System.out.println("Fail ID: " + recordID);
StreamRecord newRecord = popRecord(recordID).setId(channelID);
addRecord(newRecord);
reEmit(newRecord);
}
......
......@@ -4,6 +4,8 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import eu.stratosphere.core.io.IOReadableWritable;
......@@ -14,37 +16,46 @@ import eu.stratosphere.types.Value;
public class StreamRecord implements IOReadableWritable, Serializable {
private static final long serialVersionUID = 1L;
private Value[] fields;
private List<Value[]> recordBatch;
private StringValue uid = new StringValue("");
// it seems that we never use this variable.
private String channelID = "";
private int numOfFields;
private int numOfRecords;
public StreamRecord() {
this.numOfFields = 1;
fields = new Value[1];
//setId();
recordBatch = new ArrayList<Value[]>();
// setId();
}
public StreamRecord(int length) {
this.numOfFields = length;
fields = new Value[length];
// setId();
recordBatch = new ArrayList<Value[]>();
// setId();
}
public StreamRecord(int length, String channelID) {
this(length);
setChannelId(channelID);
}
public StreamRecord(Value value) {
this(1);
fields[0] = value;
public StreamRecord(AtomRecord record){
Value[] fields=record.getFields();
numOfFields = fields.length;
recordBatch = new ArrayList<Value[]>();
recordBatch.add(fields);
numOfRecords=recordBatch.size();
}
public int getNumOfFields() {
return numOfFields;
}
public int getNumOfRecords() {
return numOfRecords;
}
public StreamRecord setId(String channelID) {
Random rnd = new Random();
uid.setValue(channelID + "-" + rnd.nextInt(1000));
......@@ -55,16 +66,20 @@ public class StreamRecord implements IOReadableWritable, Serializable {
return uid.getValue();
}
public Value getField(int fieldNumber) {
return fields[fieldNumber];
public Value getField(int recordNumber, int fieldNumber) {
return recordBatch.get(recordNumber)[fieldNumber];
}
public AtomRecord getRecord(int recordNumber){
return new AtomRecord(recordBatch.get(recordNumber));
}
// public void getFieldInto(int fieldNumber, Value value) {
// value = fields[fieldNumber];
// }
public void setField(int fieldNumber, Value value) {
fields[fieldNumber] = value;
public void addRecord(AtomRecord record) {
Value[] fields = record.getFields();
if (fields.length == numOfFields) {
recordBatch.add(fields);
numOfRecords = recordBatch.size();
}
}
@Override
......@@ -74,10 +89,16 @@ public class StreamRecord implements IOReadableWritable, Serializable {
// Write the number of fields with an IntValue
(new IntValue(numOfFields)).write(out);
// Write the fields
for (int i = 0; i < numOfFields; i++) {
(new StringValue(fields[i].getClass().getName())).write(out);
fields[i].write(out);
// Write the number of records with an IntValue
(new IntValue(numOfRecords)).write(out);
// write the records
for (Value[] record : recordBatch) {
// Write the fields
for (int i = 0; i < numOfFields; i++) {
(new StringValue(record[i].getClass().getName())).write(out);
record[i].write(out);
}
}
}
......@@ -90,23 +111,34 @@ public class StreamRecord implements IOReadableWritable, Serializable {
numOfFieldsValue.read(in);
numOfFields = numOfFieldsValue.getValue();
// Get the number of records
IntValue numOfRecordsValue = new IntValue(0);
numOfRecordsValue.read(in);
numOfRecords = numOfRecordsValue.getValue();
// Make sure the fields have numOfFields elements
fields = new Value[numOfFields];
// Read the fields
for (int i = 0; i < numOfFields; i++) {
StringValue stringValue = new StringValue("");
stringValue.read(in);
try {
fields[i] = (Value) Class.forName(stringValue.getValue()).newInstance();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
recordBatch = new ArrayList<Value[]>();
for (int k = 0; k < numOfRecords; ++k) {
Value[] record = new Value[numOfFields];
// recordBatch=new Value[numOfFields];
// Read the fields
for (int i = 0; i < numOfFields; i++) {
StringValue stringValue = new StringValue("");
stringValue.read(in);
try {
record[i] = (Value) Class.forName(stringValue.getValue())
.newInstance();
} catch (InstantiationException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
record[i].read(in);
}
fields[i].read(in);
recordBatch.add(record);
}
}
......@@ -123,15 +155,16 @@ public class StreamRecord implements IOReadableWritable, Serializable {
public String toString() {
StringBuilder outputString = new StringBuilder();
StringValue output; // = new StringValue("");
for (int k = 0; k < numOfRecords; ++k) {
for (int i = 0; i < numOfFields; i++) {
try {
output = (StringValue) recordBatch.get(k)[i];
outputString.append(output.getValue() + "*");
} catch (ClassCastException e) {
outputString.append("PRINT_ERROR*");
}
for (int i = 0; i < this.getNumOfFields(); i++) {
try {
output = (StringValue) fields[i];
outputString.append(output.getValue() + "*");
} catch (ClassCastException e) {
outputString.append("PRINT_ERROR*");
}
}
return outputString.toString();
}
......
......@@ -22,7 +22,7 @@ public class DefaultSinkInvokable implements UserSinkInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
StringValue value = (StringValue) record.getField(0);
StringValue value = (StringValue) record.getField(0, 0);
System.out.println(value.getValue());
}
......
......@@ -15,6 +15,7 @@
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.types.StringValue;
......@@ -26,7 +27,7 @@ public class DefaultSourceInvokable extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
for (CharSequence word : mottoArray) {
emit(new StreamRecord(new StringValue(word)));
emit(new StreamRecord(new AtomRecord(new StringValue(word))));
}
}
......
......@@ -41,7 +41,8 @@ public class FieldsPartitioner implements ChannelSelector<StreamRecord> {
} catch (IllegalAccessException e) {
e.printStackTrace();
}
key = keyClass.cast(record.getField(keyPosition));
//TODO: consider hash partition the whole record batch.
key = keyClass.cast(record.getField(0, keyPosition));
return new int[] { Math.abs(key.hashCode()) % numberOfOutputChannels };
}
}
......@@ -15,6 +15,7 @@
package eu.stratosphere.streaming.test;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.IntValue;
......@@ -22,22 +23,26 @@ import eu.stratosphere.types.LongValue;
public class QuerySourceInvokable extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
for (int i = 0; i < 5; i++) {
StreamRecord record1 = new StreamRecord(3);
record1.setField(0, new IntValue(5));
record1.setField(1, new LongValue(510));
record1.setField(2, new LongValue(100));
StreamRecord record2 = new StreamRecord(3);
record2.setField(0, new IntValue(4));
record2.setField(1, new LongValue(510));
record2.setField(2, new LongValue(100));
emit(record1);
emit(record2);
}
}
@Override
public void invoke() throws Exception {
for (int i = 0; i < 5; i++) {
StreamRecord batch1 = new StreamRecord(3);
AtomRecord record1 = new AtomRecord(3);
record1.setField(0, new IntValue(5));
record1.setField(1, new LongValue(510));
record1.setField(2, new LongValue(100));
batch1.addRecord(record1);
StreamRecord batch2 = new StreamRecord(3);
AtomRecord record2=new AtomRecord(3);
record2.setField(0, new IntValue(4));
record2.setField(1, new LongValue(510));
record2.setField(2, new LongValue(100));
batch2.addRecord(record2);
emit(batch1);
emit(batch2);
}
}
}
......@@ -23,7 +23,7 @@ public class TestSinkInvokable implements UserSinkInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
StringValue value = (StringValue) record.getField(0);
StringValue value = (StringValue) record.getField(0, 0);
System.out.println(value.getValue());
}
......
......@@ -15,6 +15,7 @@
package eu.stratosphere.streaming.test;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.IntValue;
......@@ -22,18 +23,22 @@ import eu.stratosphere.types.LongValue;
public class TestSourceInvokable extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
for (int i = 0; i < 10; i++) {
StreamRecord record1 = new StreamRecord(2);
record1.setField(0, new IntValue(5));
record1.setField(1, new LongValue(500));
StreamRecord record2 = new StreamRecord(2);
record2.setField(0, new IntValue(4));
record2.setField(1, new LongValue(500));
emit(record1);
emit(record2);
}
}
@Override
public void invoke() throws Exception {
for (int i = 0; i < 10; i++) {
StreamRecord batch1 = new StreamRecord(2);
AtomRecord record1 = new AtomRecord(2);
record1.setField(0, new IntValue(5));
record1.setField(1, new LongValue(500));
StreamRecord batch2 = new StreamRecord(2);
AtomRecord record2 = new AtomRecord(2);
record2.setField(0, new IntValue(4));
record2.setField(1, new LongValue(500));
emit(batch1);
emit(batch2);
}
}
}
......@@ -15,6 +15,7 @@
package eu.stratosphere.streaming.test;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.test.cellinfo.WorkerEngineExact;
......@@ -24,23 +25,25 @@ import eu.stratosphere.types.StringValue;
public class TestTaskInvokable extends UserTaskInvokable {
private WorkerEngineExact engine = new WorkerEngineExact(10, 1000, 0);
private WorkerEngineExact engine = new WorkerEngineExact(10, 1000, 0);
@Override
public void invoke(StreamRecord record) throws Exception {
IntValue value1 = (IntValue) record.getField(0);
LongValue value2 = (LongValue) record.getField(1);
// INFO
if (record.getNumOfFields() == 2) {
engine.put(value1.getValue(), value2.getValue());
emit(new StreamRecord(new StringValue(value1 + " " + value2)));
}
// QUERY
else if (record.getNumOfFields() == 3) {
LongValue value3 = (LongValue) record.getField(2);
emit(new StreamRecord(new StringValue(String.valueOf(engine.get(
value2.getValue(), value3.getValue(), value1.getValue())))));
}
}
@Override
public void invoke(StreamRecord record) throws Exception {
IntValue value1 = (IntValue) record.getField(0, 0);
LongValue value2 = (LongValue) record.getField(0, 1);
// INFO
if (record.getNumOfFields() == 2) {
engine.put(value1.getValue(), value2.getValue());
emit(new StreamRecord(new AtomRecord(new StringValue(value1 + " "
+ value2))));
}
// QUERY
else if (record.getNumOfFields() == 3) {
LongValue value3 = (LongValue) record.getField(0, 2);
emit(new StreamRecord(new AtomRecord(new StringValue(
String.valueOf(engine.get(value2.getValue(),
value3.getValue(), value1.getValue()))))));
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.batch;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.test.util.TestBase2;
public class MyBatchStream extends TestBase2{
@Override
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("StreamSource", StreamSource.class);
graphBuilder.setSink("StreamSink", StreamSink.class);
graphBuilder.broadcastConnect("StreamSource", "StreamSink");
return graphBuilder.getJobGraph();
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.batch;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.StringValue;
public class StreamSink implements UserSinkInvokable {
private StringValue word = new StringValue("");
private IntValue count = new IntValue(1);
@Override
public void invoke(StreamRecord record) throws Exception {
word = (StringValue) record.getField(0, 0);
// count = (IntValue) record.getField(1);
System.out.println("========" + word.getValue() + "=========");
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.batch;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.StringValue;
public class StreamSource extends UserSourceInvokable {
// private final String motto = "Stratosphere Big Data looks tiny from here";
private final String motto = "Gyuszi Gabor Big Marci Gyuszi";
private final AtomRecord record=new AtomRecord();
private final StreamRecord mottoRecord=new StreamRecord();
@Override
public void invoke() throws Exception {
record.setField(0, new StringValue(motto));
mottoRecord.addRecord(record);
for (int i = 0; i < 100; i++) {
emit(mottoRecord);
}
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.batch.wordcount;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.test.util.TestBase2;
import eu.stratosphere.types.StringValue;
public class BatchWordCount extends TestBase2 {
@Override
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("BatchWordCountSource", BatchWordCountSource.class);
graphBuilder.setTask("BatchWordCountSplitter", BatchWordCountSplitter.class, 2);
graphBuilder.setTask("BatchWordCountCounter", BatchWordCountCounter.class, 2);
graphBuilder.setSink("BatchWordCountSink", BatchWordCountSink.class);
graphBuilder.broadcastConnect("BatchWordCountSource", "BatchWordCountSplitter");
graphBuilder.fieldsConnect("BatchWordCountSplitter", "BatchWordCountCounter", 0,
StringValue.class);
graphBuilder.broadcastConnect("BatchWordCountCounter", "BatchWordCountSink");
return graphBuilder.getJobGraph();
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.batch.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
public class BatchWordCountCounter extends UserTaskInvokable {
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private StringValue wordValue = new StringValue("");
private IntValue countValue = new IntValue(1);
private LongValue timestamp = new LongValue(0);
private String word = "";
private AtomRecord outputRecord = new AtomRecord(3);
private int count = 1;
@Override
public void invoke(StreamRecord record) throws Exception {
wordValue=(StringValue) record.getField(0, 0);
timestamp=(LongValue) record.getField(0, 1);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
countValue.setValue(count);
} else {
wordCounts.put(word, 1);
countValue.setValue(1);
}
outputRecord.setField(0, wordValue);
outputRecord.setField(1, countValue);
outputRecord.setField(2, timestamp);
emit(new StreamRecord(outputRecord));
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.batch.wordcount;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
public class BatchWordCountSink implements UserSinkInvokable {
private StringValue word = new StringValue("");
private IntValue count = new IntValue(1);
private LongValue timestamp = new LongValue(0);
@Override
public void invoke(StreamRecord record) throws Exception {
word=(StringValue) record.getField(0, 0);
count=(IntValue) record.getField(0, 1);
timestamp=(LongValue) record.getField(0, 2);
System.out.println("============================================");
System.out.println(word.getValue() + " " + count.getValue() + " "
+ timestamp.getValue());
System.out.println("============================================");
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.batch.wordcount;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
public class BatchWordCountSource extends UserSourceInvokable {
private BufferedReader br = null;
private String line;
private long timestamp;
private StreamRecord hamletRecords = new StreamRecord(2);
public BatchWordCountSource() {
try {
br = new BufferedReader(
new FileReader(
"src/main/java/eu/stratosphere/streaming/test/batch/wordcount/hamlet.txt"));
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void invoke() throws Exception {
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
timestamp = 0;
while (line != null) {
if (line != "") {
AtomRecord hamletRecord = new AtomRecord(2);
hamletRecord.setField(0, new StringValue(line));
hamletRecord.setField(1, new LongValue(timestamp));
hamletRecords.addRecord(hamletRecord);
++timestamp;
if(timestamp%10==0){
emit(hamletRecords);
}
line = br.readLine();
}
}
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.batch.wordcount;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
public class BatchWordCountSplitter extends UserTaskInvokable {
private StringValue sentence = new StringValue("");
private LongValue timestamp = new LongValue(0);
private String[] words = new String[0];
private StringValue wordValue = new StringValue("");
private AtomRecord outputRecord = new AtomRecord(2);
@Override
public void invoke(StreamRecord record) throws Exception {
int numberOfRecords = record.getNumOfRecords();
for(int i=0; i<numberOfRecords; ++i){
sentence = (StringValue) record.getField(i, 0);
timestamp = (LongValue) record.getField(i, 1);
words = sentence.getValue().split(" ");
for (CharSequence word : words) {
wordValue.setValue(word);
outputRecord.setField(0, wordValue);
outputRecord.setField(1, timestamp);
emit(new StreamRecord(outputRecord));
}
}
}
}
\ No newline at end of file
package eu.stratosphere.streaming.test.window.wordcount;
public class IncrementalWindow {
private int currentTupleNum;
private int fullTupleNum;
private int slideTupleNum;
public IncrementalWindow(int batchRange, int windowSize, int slidingStep){}
void pushBack(){}
void popFront(){}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.window.wordcount;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.test.util.TestBase2;
import eu.stratosphere.types.StringValue;
//TODO: window operator remains unfinished.
public class WindowWordCount extends TestBase2 {
@Override
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WindowWordCountSource", WindowWordCountSource.class);
// graphBuilder.setTask("WindowWordCountSplitter", WindowWordCountSplitter.class, 1);
// graphBuilder.setTask("WindowWordCountCounter", WindowWordCountCounter.class, 1);
graphBuilder.setSink("WindowWordCountSink", WindowWordCountSink.class);
graphBuilder.broadcastConnect("WindowWordCountSource", "WindowWordCountSplitter");
graphBuilder.fieldsConnect("WindowWordCountSplitter", "WindowWordCountCounter", 0,
StringValue.class);
graphBuilder.broadcastConnect("WindowWordCountCounter", "WindowWordCountSink");
return graphBuilder.getJobGraph();
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.window.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
public class WindowWordCountCounter extends UserTaskInvokable {
private int windowSize = 100;
private int slidingStep = 20;
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private StringValue wordValue = new StringValue("");
private IntValue countValue = new IntValue(1);
private LongValue timestamp = new LongValue(0);
private String word = "";
private AtomRecord outputRecord = new AtomRecord(3);
private int count = 1;
@Override
public void invoke(StreamRecord record) throws Exception {
wordValue=(StringValue) record.getField(0, 0);
timestamp=(LongValue) record.getField(0, 1);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
countValue.setValue(count);
} else {
wordCounts.put(word, 1);
countValue.setValue(1);
}
outputRecord.setField(0, wordValue);
outputRecord.setField(1, countValue);
outputRecord.setField(2, timestamp);
emit(new StreamRecord(outputRecord));
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.window.wordcount;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
public class WindowWordCountSink implements UserSinkInvokable {
private StringValue word = new StringValue("");
private IntValue count = new IntValue(1);
private LongValue timestamp = new LongValue(0);
@Override
public void invoke(StreamRecord record) throws Exception {
word=(StringValue) record.getField(0, 0);
count=(IntValue) record.getField(0, 1);
timestamp=(LongValue) record.getField(0, 2);
System.out.println("============================================");
System.out.println(word.getValue() + " " + count.getValue() + " "
+ timestamp.getValue());
System.out.println("============================================");
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.window.wordcount;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
public class WindowWordCountSource extends UserSourceInvokable {
private BufferedReader br = null;
private String line;
private long timestamp;
private AtomRecord hamletRecord = null;
public WindowWordCountSource() {
try {
br = new BufferedReader(
new FileReader(
"src/main/java/eu/stratosphere/streaming/test/window/wordcount/hamlet.txt"));
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void invoke() throws Exception {
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
timestamp = 0;
// while (line != null) {
for (int i = 0; i < 2; ++i) {
if (line != "") {
hamletRecord = new AtomRecord(2);
hamletRecord.setField(0, new StringValue(line));
hamletRecord.setField(1, new LongValue(timestamp));
System.out.println("========line number: " + timestamp + ", "
+ line + "==========");
emit(new StreamRecord(hamletRecord));
line = br.readLine();
++timestamp;
}
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.test.window.wordcount;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.StringValue;
public class WindowWordCountSplitter extends UserTaskInvokable {
private StringValue sentence = new StringValue("");
private LongValue timestamp = new LongValue(0);
private String[] words = new String[0];
private StringValue wordValue = new StringValue("");
private AtomRecord outputRecord = new AtomRecord(2);
@Override
public void invoke(StreamRecord record) throws Exception {
sentence=(StringValue) record.getField(0, 0);
timestamp=(LongValue) record.getField(0, 1);
System.out.println("************sentence=" + sentence.getValue() + ", timestamp="
+ timestamp.getValue()+"************");
words = sentence.getValue().split(" ");
for (CharSequence word : words) {
wordValue.setValue(word);
outputRecord.setField(0, wordValue);
outputRecord.setField(1, timestamp);
emit(new StreamRecord(outputRecord));
}
}
}
\ No newline at end of file
......@@ -17,8 +17,8 @@ package eu.stratosphere.streaming.test.wordcount;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.IntValue;
......@@ -30,28 +30,24 @@ public class WordCountCounter extends UserTaskInvokable {
private StringValue wordValue = new StringValue("");
private IntValue countValue = new IntValue(1);
private String word = "";
private StreamRecord outputRecord = new StreamRecord(2);
private AtomRecord outputRecord = new AtomRecord(2);
private int count = 1;
@Override
public void invoke(StreamRecord record) throws Exception {
wordValue = (StringValue) record.getField(0);
wordValue = (StringValue) record.getRecord(0).getField(0);
word = wordValue.getValue();
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
countValue.setValue(count);
outputRecord.setField(0, wordValue);
outputRecord.setField(1, countValue);
emit(outputRecord);
} else {
wordCounts.put(word, 1);
countValue.setValue(1);
outputRecord.setField(0, wordValue);
outputRecord.setField(1, countValue);
emit(outputRecord);
}
outputRecord.setField(0, wordValue);
outputRecord.setField(1, countValue);
emit(new StreamRecord(outputRecord));
}
}
......@@ -28,8 +28,8 @@ public class WordCountSink implements UserSinkInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
word = (StringValue) record.getField(0);
count = (IntValue) record.getField(1);
word = (StringValue) record.getField(0, 0);
count = (IntValue) record.getField(0, 1);
System.out.println(word.getValue() + " " + count.getValue());
......
......@@ -15,21 +15,23 @@
package eu.stratosphere.streaming.test.wordcount;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.StringValue;
public class WordCountSource extends UserSourceInvokable {
// private final String motto = "Stratosphere Big Data looks tiny from here";
// private final String motto =
// "Stratosphere Big Data looks tiny from here";
private final String motto = "Gyuszi Gabor Big Marci Gyuszi";
private final StreamRecord mottoRecord = new StreamRecord(new StringValue(motto));
private StreamRecord mottoRecord;
@Override
public void invoke() throws Exception {
mottoRecord = new StreamRecord(new AtomRecord(new StringValue(motto)));
for (int i = 0; i < 10000; i++) {
emit(mottoRecord);
}
}
}
\ No newline at end of file
......@@ -15,6 +15,7 @@
package eu.stratosphere.streaming.test.wordcount;
import eu.stratosphere.streaming.api.AtomRecord;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.types.StringValue;
......@@ -24,18 +25,18 @@ public class WordCountSplitter extends UserTaskInvokable {
private StringValue sentence = new StringValue("");
private String[] words = new String[0];
private StringValue wordValue = new StringValue("");
private StreamRecord outputRecord = new StreamRecord(wordValue);
private AtomRecord outputRecord = new AtomRecord(wordValue);
@Override
public void invoke(StreamRecord record) throws Exception {
//record.getFieldInto(0, sentence);
sentence = (StringValue) record.getField(0);
sentence = (StringValue) record.getRecord(0).getField(0);
System.out.println("to split: " + sentence.getValue());
words = sentence.getValue().split(" ");
for (CharSequence word : words) {
wordValue.setValue(word);
outputRecord.setField(0, wordValue);
emit(outputRecord);
emit(new StreamRecord(outputRecord));
}
}
}
\ No newline at end of file
......@@ -6,11 +6,9 @@ import java.util.LinkedList;
import java.util.List;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.types.StringValue;
public class FaultTolerancyBufferTest {
......@@ -21,233 +19,59 @@ public class FaultTolerancyBufferTest {
public void setFaultTolerancyBuffer() {
outputs = new LinkedList<RecordWriter<StreamRecord>>();
faultTolerancyBuffer = new FaultTolerancyBuffer(outputs, "1");
faultTolerancyBuffer.setNumberOfOutputs(3);
}
@Test
public void testFaultTolerancyBuffer() {
assertEquals(3, faultTolerancyBuffer.getNumberOfOutputs());
assertEquals(0, faultTolerancyBuffer.getNumberOfOutputs());
assertEquals(outputs, faultTolerancyBuffer.getOutputs());
assertEquals("1", faultTolerancyBuffer.getChannelID());
faultTolerancyBuffer.setNumberOfOutputs(3);
assertEquals(3, faultTolerancyBuffer.getNumberOfOutputs());
}
@Test
public void testAddRecord() {
StreamRecord record = (new StreamRecord(1)).setId("1");
record.setField(0, new StringValue("V1"));
faultTolerancyBuffer.addRecord(record);
assertEquals((Integer) 3, faultTolerancyBuffer.getAckCounter().get(record.getId()));
assertEquals(record,faultTolerancyBuffer.getRecordBuffer().get(record.getId()));
// fail("Not yet implemented");
}
@Test
public void testAddTimestamp() {
Long cTime = System.currentTimeMillis();
faultTolerancyBuffer.addTimestamp("1-1337");
Long recordTimeStamp = faultTolerancyBuffer.getRecordTimestamps().get(
"1-1337");
assertTrue(recordTimeStamp - cTime < 2);
String[] records = new String[] { "1-1337" };
assertArrayEquals(records,
faultTolerancyBuffer.getRecordsByTime().get(recordTimeStamp).toArray());
try {
Thread.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
faultTolerancyBuffer.addTimestamp("1-1338");
faultTolerancyBuffer.addTimestamp("1-1339");
long recordTimeStamp1 = faultTolerancyBuffer.getRecordTimestamps().get(
"1-1338");
long recordTimeStamp2 = faultTolerancyBuffer.getRecordTimestamps().get(
"1-1339");
records = new String[] { "1-1338","1-1339"};
if(recordTimeStamp1==recordTimeStamp2){
assertTrue(faultTolerancyBuffer.getRecordsByTime().get(recordTimeStamp1).contains("1-1338"));
assertTrue(faultTolerancyBuffer.getRecordsByTime().get(recordTimeStamp1).contains("1-1339"));
assertTrue(faultTolerancyBuffer.getRecordsByTime().get(recordTimeStamp1).size()==2);
}
// fail("Not yet implemented");
}
@Test
public void testPopRecord() {
StreamRecord record1 = (new StreamRecord(1)).setId("1");
record1.setField(0, new StringValue("V1"));
faultTolerancyBuffer.addRecord(record1);
assertEquals(record1, faultTolerancyBuffer.popRecord(record1.getId()));
System.out.println("---------");
}
@Test
public void testRemoveRecord() {
StreamRecord record1 = (new StreamRecord(1)).setId("1");
record1.setField(0, new StringValue("V1"));
StreamRecord record2 = (new StreamRecord(1)).setId("1");
record2.setField(0, new StringValue("V2"));
faultTolerancyBuffer.addRecord(record1);
faultTolerancyBuffer.addRecord(record2);
Long record1TS=faultTolerancyBuffer.getRecordTimestamps().get(record1.getId());
Long record2TS=faultTolerancyBuffer.getRecordTimestamps().get(record2.getId());
faultTolerancyBuffer.removeRecord(record1.getId());
assertTrue(faultTolerancyBuffer.getRecordBuffer().containsKey(record2.getId()));
assertTrue(faultTolerancyBuffer.getAckCounter().containsKey(record2.getId()));
assertTrue(faultTolerancyBuffer.getRecordTimestamps().containsKey(record2.getId()));
assertTrue(faultTolerancyBuffer.getRecordsByTime().get(record2TS).contains(record2.getId()));
assertFalse(faultTolerancyBuffer.getRecordBuffer().containsKey(record1.getId()));
assertFalse(faultTolerancyBuffer.getAckCounter().containsKey(record1.getId()));
assertFalse(faultTolerancyBuffer.getRecordTimestamps().containsKey(record1.getId()));
assertFalse(faultTolerancyBuffer.getRecordsByTime().get(record1TS).contains(record1.getId()));
// fail("Not yet implemented");
}
@Test
public void testAckRecord() {
StreamRecord record1 = (new StreamRecord(1)).setId("1");
record1.setField(0, new StringValue("V1"));
faultTolerancyBuffer.addRecord(record1);
Long record1TS=faultTolerancyBuffer.getRecordTimestamps().get(record1.getId());
faultTolerancyBuffer.ackRecord(record1.getId());
faultTolerancyBuffer.ackRecord(record1.getId());
assertEquals((Integer) 1, faultTolerancyBuffer.getAckCounter().get(record1.getId()));
assertTrue(faultTolerancyBuffer.getRecordBuffer().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getAckCounter().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getRecordTimestamps().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getRecordsByTime().get(record1TS).contains(record1.getId()));
faultTolerancyBuffer.ackRecord(record1.getId());
assertFalse(faultTolerancyBuffer.getRecordBuffer().containsKey(record1.getId()));
assertFalse(faultTolerancyBuffer.getAckCounter().containsKey(record1.getId()));
assertFalse(faultTolerancyBuffer.getRecordTimestamps().containsKey(record1.getId()));
assertFalse(faultTolerancyBuffer.getRecordsByTime().get(record1TS).contains(record1.getId()));
faultTolerancyBuffer.ackRecord(record1.getId());
// fail("Not yet implemented");
}
@Test
public void testFailRecord() {
StreamRecord record1 = (new StreamRecord(1)).setId("1");
record1.setField(0, new StringValue("V1"));
faultTolerancyBuffer.addRecord(record1);
Long record1TS=faultTolerancyBuffer.getRecordTimestamps().get(record1.getId());
assertTrue(faultTolerancyBuffer.getRecordBuffer().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getAckCounter().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getRecordTimestamps().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getRecordsByTime().get(record1TS).contains(record1.getId()));
String prevID = record1.getId();
faultTolerancyBuffer.failRecord(record1.getId());
Long record2TS=faultTolerancyBuffer.getRecordTimestamps().get(record1.getId());
assertFalse(faultTolerancyBuffer.getRecordBuffer().containsKey(prevID));
assertFalse(faultTolerancyBuffer.getAckCounter().containsKey(prevID));
assertFalse(faultTolerancyBuffer.getRecordTimestamps().containsKey(prevID));
assertFalse(faultTolerancyBuffer.getRecordsByTime().get(record1TS).contains(prevID));
faultTolerancyBuffer.ackRecord(prevID);
faultTolerancyBuffer.ackRecord(prevID);
faultTolerancyBuffer.ackRecord(prevID);
assertTrue(faultTolerancyBuffer.getRecordBuffer().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getAckCounter().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getRecordTimestamps().containsKey(record1.getId()));
assertTrue(faultTolerancyBuffer.getRecordsByTime().get(record2TS).contains(record1.getId()));
System.out.println("---------");
// fail("Not yet implemented");
}
@Test
public void testReEmit() {
// fail("Not yet implemented");
}
@Test
public void testGetRecordBuffer() {
// fail("Not yet implemented");
}
//TODO: create more tests for this method
@Test
public void testTimeOutRecords() {
faultTolerancyBuffer.setTIMEOUT(1000);
StreamRecord record1 = (new StreamRecord(1)).setId("1");
record1.setField(0, new StringValue("V1"));
StreamRecord record2 = (new StreamRecord(1)).setId("1");
record2.setField(0, new StringValue("V2"));
StreamRecord record3 = (new StreamRecord(1)).setId("1");
record3.setField(0, new StringValue("V3"));
faultTolerancyBuffer.addRecord(record1);
faultTolerancyBuffer.addRecord(record2);
try {
Thread.sleep(500);
} catch (Exception e) {
}
faultTolerancyBuffer.addRecord(record3);
Long record1TS=faultTolerancyBuffer.getRecordTimestamps().get(record1.getId());
Long record2TS=faultTolerancyBuffer.getRecordTimestamps().get(record2.getId());
Long record3TS=faultTolerancyBuffer.getRecordTimestamps().get(record3.getId());
faultTolerancyBuffer.ackRecord(record1.getId());
faultTolerancyBuffer.ackRecord(record1.getId());
faultTolerancyBuffer.ackRecord(record1.getId());
faultTolerancyBuffer.ackRecord(record2.getId());
faultTolerancyBuffer.ackRecord(record3.getId());
faultTolerancyBuffer.ackRecord(record3.getId());
try {
Thread.sleep(501);
} catch (InterruptedException e) {
}
List<String> timedOutRecords = faultTolerancyBuffer.timeoutRecords(System.currentTimeMillis());
System.out.println("timedOutRecords: "+ timedOutRecords);
assertEquals(1, timedOutRecords.size());
assertFalse(timedOutRecords.contains(record1.getId()));
assertFalse(faultTolerancyBuffer.getRecordsByTime().containsKey(record1TS));
assertFalse(faultTolerancyBuffer.getRecordsByTime().containsKey(record2TS));
assertTrue(faultTolerancyBuffer.getRecordBuffer().containsKey(record2.getId()));
assertTrue(faultTolerancyBuffer.getAckCounter().containsKey(record2.getId()));
assertTrue(faultTolerancyBuffer.getRecordTimestamps().containsKey(record2.getId()));
System.out.println(faultTolerancyBuffer.getAckCounter());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
timedOutRecords = faultTolerancyBuffer.timeoutRecords(System.currentTimeMillis());
assertEquals(null,timedOutRecords);
try {
Thread.sleep(900);
} catch (InterruptedException e) {
}
timedOutRecords = faultTolerancyBuffer.timeoutRecords(System.currentTimeMillis());
assertEquals(2, timedOutRecords.size());
System.out.println(faultTolerancyBuffer.getAckCounter());
System.out.println("---------");
// fail("Not yet implemented");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册