提交 58407efb 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] Javadoc fix

上级 ac08ec2a
......@@ -97,6 +97,8 @@ public class JobGraphBuilder {
* User defined class describing the source
* @param parallelism
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
*/
public void setSource(String sourceName,
final Class<? extends UserSourceInvokable> InvokableClass, int parallelism,
......@@ -128,6 +130,8 @@ public class JobGraphBuilder {
* User defined class describing the task
* @param parallelism
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
*/
public void setTask(String taskName, final Class<? extends UserTaskInvokable> InvokableClass,
int parallelism, int subtasksPerInstance) {
......@@ -158,6 +162,8 @@ public class JobGraphBuilder {
* User defined class describing the sink
* @param parallelism
* Number of task instances of this type to run in parallel
* @param subtasksPerInstance
* Number of subtasks allocated to a machine
*/
public void setSink(String sinkName, final Class<? extends UserSinkInvokable> InvokableClass,
int parallelism, int subtasksPerInstance) {
......@@ -223,7 +229,7 @@ public class JobGraphBuilder {
public void setInstanceSharing(String component1, String component2) {
AbstractJobVertex c1 = components.get(component1);
AbstractJobVertex c2 = components.get(component2);
c1.setVertexToShareInstancesWith(c2);
}
......
......@@ -80,7 +80,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class,
Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class };
// TODO implement equals, clone
/**
* Creates a new empty instance for read
*/
......@@ -97,7 +96,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
this.numOfFields = numOfFields;
this.numOfTuples = 0;
tupleBatch = new ArrayList<Tuple>();
}
/**
......@@ -113,7 +111,16 @@ public class StreamRecord implements IOReadableWritable, Serializable {
this.numOfTuples = 0;
this.batchSize = batchSize;
tupleBatch = new ArrayList<Tuple>(batchSize);
}
public StreamRecord(StreamRecord record) {
this.numOfFields = record.getNumOfFields();
this.numOfTuples = 0;
tupleBatch = new ArrayList<Tuple>();
this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20));
for (int i = 0; i < record.getNumOfTuples(); ++i) {
this.tupleBatch.add(copyTuple(record.getTuple(i)));
}
}
/**
......@@ -131,7 +138,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
this.batchSize = batchSize;
tupleBatch = new ArrayList<Tuple>(batchSize);
tupleBatch.add(tuple);
}
/**
......@@ -145,6 +151,10 @@ public class StreamRecord implements IOReadableWritable, Serializable {
this(tuple, 1);
}
public boolean isEmpty() {
return (this.numOfTuples == 0);
}
/**
* @return Number of fields in the tuples
*/
......@@ -499,6 +509,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param o
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setField(int fieldNumber, Object o) throws NoSuchFieldException {
setField(0, fieldNumber, o);
......@@ -514,6 +525,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param o
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
// TODO: consider no such tuple exception and interaction with batch size
public void setField(int tupleNumber, int fieldNumber, Object o) throws NoSuchFieldException {
......@@ -533,6 +545,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param b
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setBoolean(int fieldNumber, Boolean b) throws NoSuchFieldException {
setBoolean(0, fieldNumber, b);
......@@ -549,6 +562,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param b
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setBoolean(int tupleNumber, int fieldNumber, Boolean b) throws NoSuchFieldException {
setField(tupleNumber, fieldNumber, b);
......@@ -562,6 +576,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param b
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setByte(int fieldNumber, Byte b) throws NoSuchFieldException {
setByte(0, fieldNumber, b);
......@@ -577,6 +592,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param b
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setByte(int tupleNumber, int fieldNumber, Byte b) throws NoSuchFieldException {
setField(tupleNumber, fieldNumber, b);
......@@ -591,6 +607,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param c
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setCharacter(int fieldNumber, Character c) throws NoSuchFieldException {
setCharacter(0, fieldNumber, c);
......@@ -607,6 +624,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param c
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setCharacter(int tupleNumber, int fieldNumber, Character c)
throws NoSuchFieldException {
......@@ -621,6 +639,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param d
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setDouble(int fieldNumber, Double d) throws NoSuchFieldException {
setDouble(0, fieldNumber, d);
......@@ -637,6 +656,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param d
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setDouble(int tupleNumber, int fieldNumber, Double d) throws NoSuchFieldException {
setField(tupleNumber, fieldNumber, d);
......@@ -650,6 +670,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param f
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setFloat(int fieldNumber, Float f) throws NoSuchFieldException {
setFloat(0, fieldNumber, f);
......@@ -666,6 +687,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param f
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setFloat(int tupleNumber, int fieldNumber, Float f) throws NoSuchFieldException {
setField(tupleNumber, fieldNumber, f);
......@@ -680,6 +702,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param i
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setInteger(int fieldNumber, Integer i) throws NoSuchFieldException {
setInteger(0, fieldNumber, i);
......@@ -696,6 +719,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param i
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setInteger(int tupleNumber, int fieldNumber, Integer i) throws NoSuchFieldException {
setField(tupleNumber, fieldNumber, i);
......@@ -709,6 +733,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param l
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setLong(int fieldNumber, Long l) throws NoSuchFieldException {
setLong(0, fieldNumber, l);
......@@ -724,6 +749,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param l
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setLong(int tupleNumber, int fieldNumber, Long l) throws NoSuchFieldException {
setField(tupleNumber, fieldNumber, l);
......@@ -737,6 +763,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param s
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setShort(int fieldNumber, Short s) throws NoSuchFieldException {
setShort(0, fieldNumber, s);
......@@ -752,6 +779,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param s
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setShort(int tupleNumber, int fieldNumber, Short s) throws NoSuchFieldException {
setField(tupleNumber, fieldNumber, s);
......@@ -765,6 +793,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param str
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setString(int fieldNumber, String str) throws NoSuchFieldException {
setField(0, fieldNumber, str);
......@@ -781,6 +810,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param str
* New value
* @throws NoSuchFieldException
* the Tuple does not have this many fields
*/
public void setString(int tupleNumber, int fieldNumber, String str) throws NoSuchFieldException {
setField(tupleNumber, fieldNumber, str);
......@@ -789,6 +819,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
/**
* @return First tuple of the batch
* @throws NoSuchTupleException
* the StreamRecord does not have this many tuples
*/
public Tuple getTuple() throws NoSuchTupleException {
return getTuple(0);
......@@ -799,6 +830,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Position of the record in the batch
* @return Chosen tuple
* @throws NoSuchTupleException
* the Tuple does not have this many fields
*/
public Tuple getTuple(int tupleNumber) throws NoSuchTupleException {
try {
......@@ -855,7 +887,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*
* @param tuple
* Tuple to set
* @throws TupleSizeMismatchException
* @throws NoSuchTupleException
* , TupleSizeMismatchException
*/
public void setTuple(Tuple tuple) throws NoSuchTupleException, TupleSizeMismatchException {
setTuple(0, tuple);
......@@ -890,6 +923,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public void addTuple(Tuple tuple) throws TupleSizeMismatchException {
addTuple(numOfTuples, tuple);
......@@ -903,6 +938,8 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Position of the added tuple
* @param tuple
* Tuple to be added as the next record of the batch
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public void addTuple(int index, Tuple tuple) throws TupleSizeMismatchException {
if (tuple.getArity() == numOfFields) {
......@@ -920,6 +957,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* Index of tuple to remove
* @return Removed tuple
* @throws TupleSizeMismatchException
* Tuple specified has illegal size
*/
public Tuple removeTuple(int index) throws TupleSizeMismatchException {
if (index < numOfTuples) {
......@@ -936,6 +974,7 @@ public class StreamRecord implements IOReadableWritable, Serializable {
*
* @return copy of the StreamRecord
* @throws IOException
* Write or read failed
*/
public StreamRecord copySerialized() throws IOException {
ByteArrayOutputStream buff = new ByteArrayOutputStream();
......@@ -972,8 +1011,6 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param tuple
* Tuple to copy
* @return Copy of the tuple
* @throws IllegalAccessException
* @throws InstantiationException
*/
public static Tuple copyTuple(Tuple tuple) {
// TODO: implement deep copy for arrays
......@@ -1028,6 +1065,18 @@ public class StreamRecord implements IOReadableWritable, Serializable {
return newTuple;
}
/**
* copy tuples from the given record and append them to the end.
*
* @param record
* record to be appended
*/
public void appendRecord(StreamRecord record) {
for (int i = 0; i < record.getNumOfTuples(); ++i) {
this.addTuple(record.getTuple(i));
}
}
/**
* Converts tuple field types to a byte array
*
......
......@@ -15,38 +15,45 @@
package eu.stratosphere.streaming.examples.batch.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.MutableTableStateIterator;
public class BatchWordCountCounter extends UserTaskInvokable {
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private MutableTableState<String, Integer> wordCounts = new MutableTableState<String, Integer>();
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
private StreamRecord outRecord = new StreamRecord(new Tuple3<String, Integer, Long>());
private StreamRecord outRecord = new StreamRecord(3);
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
timestamp = record.getLong(1);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
} else {
count = 1;
wordCounts.put(word, 1);
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
word = record.getString(i, 0);
count = record.getInteger(i, 1);
timestamp = record.getLong(i, 2);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
} else {
count = 1;
wordCounts.put(word, 1);
}
}
outRecord.setString(0, word);
outRecord.setInteger(1, count);
outRecord.setLong(2, timestamp);
MutableTableStateIterator<String, Integer> iterator = wordCounts
.getIterator();
while (iterator.hasNext()) {
Tuple2<String, Integer> tuple = iterator.next();
Tuple3<String, Integer, Long> outputTuple = new Tuple3<String, Integer, Long>(
(String) tuple.getField(0), (Integer) tuple.getField(1), timestamp);
outRecord.addTuple(outputTuple);
}
emit(outRecord);
}
}
\ No newline at end of file
......@@ -26,12 +26,14 @@ public class BatchWordCountSink extends UserSinkInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
count = record.getInteger(1);
timestamp = record.getLong(2);
System.out.println("============================================");
System.out.println(word + " " + count + " " + timestamp);
System.out.println("============================================");
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
word = record.getString(i, 0);
count = record.getInteger(i, 1);
timestamp = record.getLong(i, 2);
System.out.println("============================================");
System.out.println(word + " " + count + " " + timestamp);
System.out.println("============================================");
}
}
}
......@@ -28,9 +28,6 @@ public class BatchWordCountSource extends UserSourceInvokable {
private BufferedReader br = null;
private String line = "";
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Long>());
private final static int BATCH_SIZE = 20;
private Long timestamp = 0L;
public BatchWordCountSource() {
......@@ -39,29 +36,24 @@ public class BatchWordCountSource extends UserSourceInvokable {
} catch (FileNotFoundException e) {
e.printStackTrace();
}
timestamp = 0L;
}
@Override
public void invoke() throws Exception {
timestamp = 0L;
outRecord = new StreamRecord(2);
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
while (line != null) {
for(int i=0; i<100; ++i) {
line = br.readLine();
if(line==null){
break;
}
if (line != "") {
outRecord.addTuple(new Tuple2<String, Long>(line, timestamp));
line=line.replaceAll("[\\-\\+\\.\\^:,]", "");
System.out.println("line="+line);
outRecord.setString(0, line);
outRecord.setLong(1, timestamp);
timestamp++;
if (timestamp % BATCH_SIZE == 0) {
emit(outRecord);
outRecord = new StreamRecord(2);
}
emit(outRecord);
}
line = br.readLine();
}
}
}
\ No newline at end of file
}
......@@ -15,31 +15,26 @@
package eu.stratosphere.streaming.examples.batch.wordcount;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class BatchWordCountSplitter extends UserTaskInvokable {
private String[] words = new String[] {};
private StreamRecord outputRecord = new StreamRecord(new Tuple2<String,Long>());
private StreamRecord outputRecord = new StreamRecord(3);
private Long timestamp =0L;
private Long timestamp = 0L;
@Override
public void invoke(StreamRecord record) throws Exception {
int numberOfRecords = record.getNumOfTuples();
for (int i = 0; i < numberOfRecords; ++i) {
words = record.getString(0).split(" ");
timestamp=record.getLong(1);
for (String word : words) {
outputRecord.setString(0, word);
outputRecord.setLong(1, timestamp);
emit(outputRecord);
}
words = record.getString(0).split(" ");
timestamp = record.getLong(1);
System.out.println("sentence=" + record.getString(0) + ", timestamp="
+ record.getLong(1));
for (String word : words) {
Tuple3<String, Integer, Long> tuple =new Tuple3<String, Integer, Long>(word, 1, timestamp);
outputRecord.addTuple(tuple);
}
emit(outputRecord);
}
}
\ No newline at end of file
......@@ -15,51 +15,63 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableInternalState;
import eu.stratosphere.streaming.state.WindowInternalState;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.MutableTableStateIterator;
import eu.stratosphere.streaming.state.WindowState;
public class WindowWordCountCounter extends UserTaskInvokable {
private int windowSize;
private int slidingStep;
private int computeGranularity;
private int windowFieldId;
private WindowInternalState<Integer> window;
private MutableInternalState<String, Integer> wordCounts;
private WindowState<Integer> window;
private MutableTableState<String, Integer> wordCounts;
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
private StreamRecord outRecord = new StreamRecord(
new Tuple3<String, Integer, Long>());
private StreamRecord outRecord = new StreamRecord(3);
public WindowWordCountCounter() {
windowSize = 100;
slidingStep = 20;
window = new WindowInternalState<Integer>(windowSize, slidingStep);
wordCounts = new MutableInternalState<String, Integer>();
computeGranularity = 10;
windowFieldId = 2;
window = new WindowState<Integer>(windowSize, slidingStep,
computeGranularity, windowFieldId);
wordCounts = new MutableTableState<String, Integer>();
}
private void incrementCompute(StreamRecord record) {
word = record.getString(0);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
} else {
count = 1;
wordCounts.put(word, 1);
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
word = record.getString(i, 0);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
} else {
count = 1;
wordCounts.put(word, 1);
}
}
}
private void decrementCompute(StreamRecord record) {
word = record.getString(0);
count = wordCounts.get(word) - 1;
if (count == 0) {
wordCounts.delete(word);
} else {
wordCounts.put(word, count);
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
word = record.getString(i, 0);
count = wordCounts.get(word) - 1;
if (count == 0) {
wordCounts.delete(word);
} else {
wordCounts.put(word, count);
}
}
}
......@@ -71,18 +83,28 @@ public class WindowWordCountCounter extends UserTaskInvokable {
decrementCompute(expiredRecord);
window.pushBack(record);
if (window.isComputable()) {
outRecord.setString(0, word);
outRecord.setInteger(1, count);
outRecord.setLong(2, timestamp);
MutableTableStateIterator<String, Integer> iterator = wordCounts
.getIterator();
while (iterator.hasNext()) {
Tuple2<String, Integer> tuple = iterator.next();
Tuple3<String, Integer, Long> outputTuple = new Tuple3<String, Integer, Long>(
(String) tuple.getField(0), (Integer) tuple.getField(1), timestamp);
outRecord.addTuple(outputTuple);
}
emit(outRecord);
}
} else {
incrementCompute(record);
window.pushBack(record);
if(window.isFull()){
outRecord.setString(0, word);
outRecord.setInteger(1, count);
outRecord.setLong(2, timestamp);
if (window.isFull()) {
MutableTableStateIterator<String, Integer> iterator = wordCounts
.getIterator();
while (iterator.hasNext()) {
Tuple2<String, Integer> tuple = iterator.next();
Tuple3<String, Integer, Long> outputTuple = new Tuple3<String, Integer, Long>(
(String) tuple.getField(0), (Integer) tuple.getField(1), timestamp);
outRecord.addTuple(outputTuple);
}
emit(outRecord);
}
}
......
......@@ -26,12 +26,14 @@ public class WindowWordCountSink extends UserSinkInvokable {
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
count = record.getInteger(1);
timestamp = record.getLong(2);
System.out.println("============================================");
System.out.println(word + " " + count + " " + timestamp);
System.out.println("============================================");
int numTuple = record.getNumOfTuples();
for (int i = 0; i < numTuple; ++i) {
word = record.getString(i, 0);
count = record.getInteger(i, 1);
timestamp = record.getLong(i, 2);
System.out.println("============================================");
System.out.println(word + " " + count + " " + timestamp);
System.out.println("============================================");
}
}
}
......@@ -26,10 +26,9 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowWordCountSource extends UserSourceInvokable {
private BufferedReader br = null;
private String line = new String();
private String line = "";
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Long>());
private Long timestamp;
private Long timestamp = 0L;
public WindowWordCountSource() {
try {
......@@ -37,20 +36,24 @@ public class WindowWordCountSource extends UserSourceInvokable {
} catch (FileNotFoundException e) {
e.printStackTrace();
}
timestamp = 0L;
}
@Override
public void invoke() throws Exception {
timestamp = 0L;
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
while (line != null) {
for(int i=0; i<10; ++i) {
line = br.readLine();
if(line==null){
break;
}
if (line != "") {
line=line.replaceAll("[\\-\\+\\.\\^:,]", "");
System.out.println("line="+line);
outRecord.setString(0, line);
outRecord.setLong(1, timestamp);
timestamp++;
emit(outRecord);
}
line = br.readLine();
timestamp++;
}
}
}
......@@ -15,14 +15,13 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowWordCountSplitter extends UserTaskInvokable {
private String[] words = new String[] {};
private StreamRecord outputRecord = new StreamRecord(new Tuple2<String, Long>());
private StreamRecord outputRecord = new StreamRecord(3);
private Long timestamp = 0L;
......@@ -30,13 +29,12 @@ public class WindowWordCountSplitter extends UserTaskInvokable {
public void invoke(StreamRecord record) throws Exception {
words = record.getString(0).split(" ");
timestamp = record.getLong(1);
System.out.println("************sentence=" + words + ", timestamp=" + timestamp
+ "************");
System.out.println("sentence=" + record.getString(0) + ", timestamp="
+ record.getLong(1));
for (String word : words) {
outputRecord.setString(0, word);
outputRecord.setLong(1, timestamp);
emit(outputRecord);
Tuple3<String, Integer, Long> tuple =new Tuple3<String, Integer, Long>(word, 1, timestamp);
outputRecord.addTuple(tuple);
}
emit(outputRecord);
}
}
\ No newline at end of file
......@@ -15,16 +15,14 @@
package eu.stratosphere.streaming.examples.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
public class WordCountCounter extends UserTaskInvokable {
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private MutableTableState<String, Integer> wordCounts = new MutableTableState<String, Integer>();
private String word = "";
private Integer count = 0;
......
......@@ -28,16 +28,11 @@ import eu.stratosphere.streaming.util.LogUtils;
public class WordCountLocal {
private static JobGraph getJobGraph(int sourceSubtasks, int sourceSubtasksPerInstance,
int counterSubtasks, int counterSubtasksPerInstance, int sinkSubtasks,
int sinkSubtasksPerInstance) throws Exception {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class,
sourceSubtasks, sourceSubtasksPerInstance);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, counterSubtasks,
counterSubtasksPerInstance);
graphBuilder.setSink("WordCountSink", WordCountSink.class, sinkSubtasks,
sinkSubtasksPerInstance);
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 1, 1);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.fieldsConnect("WordCountSourceSplitter", "WordCountCounter", 0);
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
......@@ -45,71 +40,44 @@ public class WordCountLocal {
return graphBuilder.getJobGraph();
}
private static void wrongArgs() {
System.out
.println("USAGE:\n"
+ "run <local/cluster> <SOURCE num of subtasks> <SOURCE subtasks per instance> <SPLITTER num of subtasks> <SPLITTER subtasks per instance> <COUNTER num of subtasks> <COUNTER subtasks per instance> <SINK num of subtasks> <SINK subtasks per instance>");
}
// TODO: arguments check
public static void main(String[] args) {
if (args.length != 7) {
wrongArgs();
} else {
LogUtils.initializeDefaultConsoleLogger(Level.ERROR, Level.INFO);
int sourceSubtasks = 1;
int sourceSubtasksPerInstance = 1;
int counterSubtasks = 1;
int counterSubtasksPerInstance = 1;
int sinkSubtasks = 1;
int sinkSubtasksPerInstance = 1;
try {
sourceSubtasks = Integer.parseInt(args[1]);
sourceSubtasksPerInstance = Integer.parseInt(args[2]);
counterSubtasks = Integer.parseInt(args[3]);
counterSubtasksPerInstance = Integer.parseInt(args[4]);
sinkSubtasks = Integer.parseInt(args[5]);
sinkSubtasksPerInstance = Integer.parseInt(args[6]);
} catch (Exception e) {
wrongArgs();
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
try {
JobGraph jG = getJobGraph(sourceSubtasks, sourceSubtasksPerInstance,
counterSubtasks, counterSubtasksPerInstance, sinkSubtasks,
sinkSubtasksPerInstance);
Configuration configuration = jG.getJobConfiguration();
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
if (args.length == 0) {
args = new String[] { "local" };
}
exec.start();
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
exec.start();
client.run(jG, true);
Client client = new Client(new InetSocketAddress("localhost", 6498),
configuration);
exec.stop();
client.run(jG, true);
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
Client client = new Client(new InetSocketAddress("dell150", 6123),
configuration);
client.run(jG, true);
}
client.run(jG, true);
} catch (Exception e) {
System.out.println(e);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
......@@ -16,7 +16,6 @@
package eu.stratosphere.streaming.examples.wordcount;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple1;
......@@ -31,27 +30,20 @@ public class WordCountSource extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
for (int i = 0; i < 2; i++) {
try {
br = new BufferedReader(new FileReader(
"/home/strato/stratosphere-distrib/resources/hamlet.txt"));
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
while (line != null) {
if (line != "") {
outRecord.setString(0, line);
emit(outRecord);
performanceCounter.count();
}
line = br.readLine();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/hamlet.txt"));
while (true) {
line = br.readLine();
if (line == null) {
break;
}
if (line != "") {
line=line.replaceAll("[\\-\\+\\.\\^:,]", "");
outRecord.setString(0, line);
emit(outRecord);
performanceCounter.count();
}
line = br.readLine();
}
}
}
\ No newline at end of file
......@@ -16,7 +16,6 @@
package eu.stratosphere.streaming.examples.wordcount;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple1;
......@@ -31,28 +30,22 @@ public class WordCountSourceSplitter extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
br = new BufferedReader(new FileReader(
"src/test/resources/testdata/hamlet.txt"));
while (true) {
try {
br = new BufferedReader(new FileReader(
"/home/strato/stratosphere-distrib/resources/hamlet.txt"));
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
while (line != null) {
if (line != "") {
for (String word : line.split(" ")) {
outRecord.setString(0, word);
emit(outRecord);
performanceCounter.count();
}
}
line = br.readLine();
line = br.readLine();
if (line == null) {
break;
}
if (line != "") {
line=line.replaceAll("[\\-\\+\\.\\^:,]", "");
for (String word : line.split(" ")) {
outRecord.setString(0, word);
System.out.println("word=" + word);
emit(outRecord);
performanceCounter.count();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.wordcount;
import java.net.InetSocketAddress;
import org.apache.log4j.Level;
import eu.stratosphere.client.minicluster.NepheleMiniCluster;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.util.LogUtils;
public class WordCountStarter {
private static JobGraph getJobGraph(int sourceSubtasks, int sourceSubtasksPerInstance,
int counterSubtasks, int counterSubtasksPerInstance, int sinkSubtasks,
int sinkSubtasksPerInstance) throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class,
sourceSubtasks, sourceSubtasksPerInstance);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, counterSubtasks,
counterSubtasksPerInstance);
graphBuilder.setSink("WordCountSink", WordCountSink.class, sinkSubtasks,
sinkSubtasksPerInstance);
graphBuilder.fieldsConnect("WordCountSourceSplitter", "WordCountCounter", 0);
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
return graphBuilder.getJobGraph();
}
private static void wrongArgs() {
System.out
.println("USAGE:\n"
+ "run <local/cluster> <SOURCE num of subtasks> <SOURCE subtasks per instance> <SPLITTER num of subtasks> <SPLITTER subtasks per instance> <COUNTER num of subtasks> <COUNTER subtasks per instance> <SINK num of subtasks> <SINK subtasks per instance>");
}
// TODO: arguments check
public static void main(String[] args) {
if (args.length != 7) {
wrongArgs();
} else {
LogUtils.initializeDefaultConsoleLogger(Level.ERROR, Level.INFO);
int sourceSubtasks = 1;
int sourceSubtasksPerInstance = 1;
int counterSubtasks = 1;
int counterSubtasksPerInstance = 1;
int sinkSubtasks = 1;
int sinkSubtasksPerInstance = 1;
try {
sourceSubtasks = Integer.parseInt(args[1]);
sourceSubtasksPerInstance = Integer.parseInt(args[2]);
counterSubtasks = Integer.parseInt(args[3]);
counterSubtasksPerInstance = Integer.parseInt(args[4]);
sinkSubtasks = Integer.parseInt(args[5]);
sinkSubtasksPerInstance = Integer.parseInt(args[6]);
} catch (Exception e) {
wrongArgs();
}
try {
JobGraph jG = getJobGraph(sourceSubtasks, sourceSubtasksPerInstance,
counterSubtasks, counterSubtasksPerInstance, sinkSubtasks,
sinkSubtasksPerInstance);
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498),
configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster mode");
Client client = new Client(new InetSocketAddress("dell150", 6123),
configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
}
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.index;
/**
* An easy open-sourced implementation of B-tree.
* Currently this implementation does not support duplicated key insert.
* This file would be reimplemented and optimized for in-memory state.
* Source code website: http://algs4.cs.princeton.edu/62btrees/BTree.java
*/
public class BTreeIndex<Key extends Comparable<Key>, Value> {
private static final int M = 4; // max children per B-tree node = M-1
private Node root; // root of the B-tree
private int HT; // height of the B-tree
private int N; // number of key-value pairs in the B-tree
// helper B-tree node data type
private static final class Node {
private int m; // number of children
private Entry[] children = new Entry[M]; // the array of children
private Node(int k) { m = k; } // create a node with k children
}
// internal nodes: only use key and next
// external nodes: only use key and value
private static class Entry {
private Comparable key;
private Object value;
private Node next; // helper field to iterate over array entries
public Entry(Comparable key, Object value, Node next) {
this.key = key;
this.value = value;
this.next = next;
}
}
// constructor
public BTreeIndex() { root = new Node(0); }
// return number of key-value pairs in the B-tree
public int size() { return N; }
// return height of B-tree
public int height() { return HT; }
// search for given key, return associated value; return null if no such key
public Value get(Key key) { return search(root, key, HT); }
private Value search(Node x, Key key, int ht) {
Entry[] children = x.children;
// external node
if (ht == 0) {
for (int j = 0; j < x.m; j++) {
if (eq(key, children[j].key)) return (Value) children[j].value;
}
}
// internal node
else {
for (int j = 0; j < x.m; j++) {
if (j+1 == x.m || less(key, children[j+1].key))
return search(children[j].next, key, ht-1);
}
}
return null;
}
// insert key-value pair
// add code to check for duplicate keys
public void put(Key key, Value value) {
Node u = insert(root, key, value, HT);
N++;
if (u == null) return;
// need to split root
Node t = new Node(2);
t.children[0] = new Entry(root.children[0].key, null, root);
t.children[1] = new Entry(u.children[0].key, null, u);
root = t;
HT++;
}
private Node insert(Node h, Key key, Value value, int ht) {
int j;
Entry t = new Entry(key, value, null);
// external node
if (ht == 0) {
for (j = 0; j < h.m; j++) {
if (less(key, h.children[j].key)) break;
}
}
// internal node
else {
for (j = 0; j < h.m; j++) {
if ((j+1 == h.m) || less(key, h.children[j+1].key)) {
Node u = insert(h.children[j++].next, key, value, ht-1);
if (u == null) return null;
t.key = u.children[0].key;
t.next = u;
break;
}
}
}
for (int i = h.m; i > j; i--) h.children[i] = h.children[i-1];
h.children[j] = t;
h.m++;
if (h.m < M) return null;
else return split(h);
}
// split node in half
private Node split(Node h) {
Node t = new Node(M/2);
h.m = M/2;
for (int j = 0; j < M/2; j++)
t.children[j] = h.children[M/2+j];
return t;
}
// for debugging
public String toString() {
return toString(root, HT, "") + "\n";
}
private String toString(Node h, int ht, String indent) {
String s = "";
Entry[] children = h.children;
if (ht == 0) {
for (int j = 0; j < h.m; j++) {
s += indent + children[j].key + " " + children[j].value + "\n";
}
}
else {
for (int j = 0; j < h.m; j++) {
if (j > 0) s += indent + "(" + children[j].key + ")\n";
s += toString(children[j].next, ht-1, indent + " ");
}
}
return s;
}
// comparison functions - make Comparable instead of Key to avoid casts
private boolean less(Comparable k1, Comparable k2) {
return k1.compareTo(k2) < 0;
}
private boolean eq(Comparable k1, Comparable k2) {
return k1.compareTo(k2) == 0;
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.index;
public class IndexPair{
public IndexPair(int block, int entry){
blockId=block;
entryId=entry;
}
public IndexPair(IndexPair pair){
blockId=pair.blockId;
entryId=pair.entryId;
}
public void setIndexPair(int block, int entry){
blockId=block;
entryId=entry;
}
public void IncrementBlock(){
blockId=blockId+1;
entryId=0;
}
public int blockId;
public int entryId;
}
......@@ -15,80 +15,78 @@
package eu.stratosphere.streaming.state;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.index.IndexPair;
/**
* The window state for window operator. To be general enough, this class
* implements a count based window operator. It is possible for the user to
* compose time based window operator by extending this class by splitting the
* stream into multiple mini batches.
* The log-structured key value store thats accept any modification operation by
* appending the value to the end of the state.
*/
public class WindowInternalState<K> implements InternalState<K, StreamRecord> {
private int currentRecordNum;
private int fullRecordNum;
private int slideRecordNum;
CircularFifoBuffer buffer;
public class LogTableState<K, V> implements TableState<K, V> {
public WindowInternalState(int windowSize, int slidingStep) {
currentRecordNum = 0;
fullRecordNum = windowSize;
slideRecordNum = slidingStep;
buffer = new CircularFifoBuffer(windowSize);
}
public void pushBack(StreamRecord records) {
buffer.add(records);
currentRecordNum += 1;
}
private HashMap<K, IndexPair> hashMap = new HashMap<K, IndexPair>();
private HashMap<Integer, ArrayList<V>> blockList = new HashMap<Integer, ArrayList<V>>();
private final int perBlockEntryCount = 1000;
private IndexPair nextInsertPos = new IndexPair(-1, -1);
public StreamRecord popFront() {
StreamRecord frontRecord=(StreamRecord) buffer.get();
buffer.remove();
return frontRecord;
public LogTableState() {
blockList.put(0, new ArrayList<V>());
nextInsertPos.setIndexPair(0, 0);
}
public boolean isFull() {
return currentRecordNum >= fullRecordNum;
@Override
public void put(K key, V value) {
// TODO Auto-generated method stub
if (nextInsertPos.entryId == perBlockEntryCount) {
blockList.put(nextInsertPos.blockId + 1, new ArrayList<V>());
nextInsertPos.IncrementBlock();
}
blockList.get(nextInsertPos.blockId).add(value);
hashMap.put(key, new IndexPair(nextInsertPos));
nextInsertPos.entryId += 1;
}
public boolean isComputable() {
if (currentRecordNum == fullRecordNum + slideRecordNum) {
currentRecordNum -= slideRecordNum;
return true;
@Override
public V get(K key) {
// TODO Auto-generated method stub
IndexPair index = hashMap.get(key);
if (index == null) {
return null;
} else {
return blockList.get(index.blockId).get(index.entryId);
}
return false;
}
@Override
public void put(K key, StreamRecord value) {
public void delete(K key) {
// TODO Auto-generated method stub
hashMap.remove(key);
}
@Override
public StreamRecord get(K key) {
public boolean containsKey(K key) {
// TODO Auto-generated method stub
return null;
return hashMap.containsKey(key);
}
@Override
public void delete(K key) {
public String serialize() {
// TODO Auto-generated method stub
return null;
}
@Override
public boolean containsKey(K key) {
public void deserialize(String str) {
// TODO Auto-generated method stub
return false;
}
@Override
public StateIterator<K, StreamRecord> getIterator() {
public TableStateIterator<K, V> getIterator() {
// TODO Auto-generated method stub
return null;
return new LogTableStateIterator<K, V>(hashMap.entrySet().iterator(), blockList);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.index.IndexPair;
public class LogTableStateIterator<K, V> implements TableStateIterator<K ,V>{
private Iterator<Entry<K, IndexPair>> iterator;
private HashMap<Integer, ArrayList<V>> blockList;
public LogTableStateIterator(Iterator<Entry<K, IndexPair>> iter, HashMap<Integer, ArrayList<V>> blocks){
iterator=iter;
blockList=blocks;
}
@Override
public boolean hasNext() {
// TODO Auto-generated method stub
return false;
}
@Override
public Tuple2<K, V> next() {
// TODO Auto-generated method stub
return null;
}
}
......@@ -21,7 +21,7 @@ import java.util.Map;
/**
* The most general internal state that stores data in a mutable map.
*/
public class MutableInternalState<K, V> implements InternalState<K, V> {
public class MutableTableState<K, V> implements TableState<K, V> {
private Map<K, V> state=new LinkedHashMap<K, V>();
@Override
......@@ -49,9 +49,21 @@ public class MutableInternalState<K, V> implements InternalState<K, V> {
}
@Override
public StateIterator<K, V> getIterator() {
public MutableTableStateIterator<K, V> getIterator() {
// TODO Auto-generated method stub
return new MutableStateIterator<K, V>(state.entrySet().iterator());
return new MutableTableStateIterator<K, V>(state.entrySet().iterator());
}
@Override
public String serialize() {
// TODO Auto-generated method stub
return null;
}
@Override
public void deserialize(String str) {
// TODO Auto-generated method stub
}
}
......@@ -20,10 +20,10 @@ import java.util.Map.Entry;
import eu.stratosphere.api.java.tuple.Tuple2;
public class MutableStateIterator<K, V> implements StateIterator<K, V>{
public class MutableTableStateIterator<K, V> implements TableStateIterator<K, V>{
private Iterator<Entry<K, V>> iterator;
public MutableStateIterator(Iterator<Entry<K, V>> iter){
public MutableTableStateIterator(Iterator<Entry<K, V>> iter){
iterator=iter;
}
......
......@@ -18,10 +18,12 @@ package eu.stratosphere.streaming.state;
/**
* An internal state interface that supports stateful operator.
*/
public interface InternalState<K, V> {
public interface TableState<K, V> {
public void put(K key, V value);
public V get(K key);
public void delete(K key);
public boolean containsKey(K key);
public StateIterator<K, V> getIterator();
public String serialize();
public void deserialize(String str);
public TableStateIterator<K, V> getIterator();
}
......@@ -20,7 +20,7 @@ import eu.stratosphere.api.java.tuple.Tuple2;
/**
* the iterator for internal states.
*/
public interface StateIterator<K, V>{
public interface TableStateIterator<K, V>{
public boolean hasNext();
public Tuple2<K, V> next();
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import java.util.HashMap;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.index.IndexPair;
/**
* The window state for window operator. To be general enough, this class
* implements a count based window operator. It is possible for the user to
* compose time based window operator by extending this class by splitting the
* stream into multiple mini batches.
*/
public class WindowState<K> {
private int windowSize;
private int slidingStep;
private int computeGranularity;
private int windowFieldId;
private int initTimestamp;
private int nextTimestamp;
private int currentRecordCount;
private int fullRecordCount;
private int slideRecordCount;
HashMap<K, IndexPair> windowIndex;
CircularFifoBuffer buffer;
StreamRecord tempRecord;
public WindowState(int windowSize, int slidingStep, int computeGranularity,
int windowFieldId) {
this.windowSize = windowSize;
this.slidingStep = slidingStep;
this.computeGranularity = computeGranularity;
this.windowFieldId = windowFieldId;
this.initTimestamp = -1;
this.nextTimestamp = -1;
this.currentRecordCount = 0;
// here we assume that windowSize and slidingStep is divisible by
// computeGranularity.
this.fullRecordCount = windowSize / computeGranularity;
this.slideRecordCount = slidingStep / computeGranularity;
this.windowIndex = new HashMap<K, IndexPair>();
this.buffer = new CircularFifoBuffer(fullRecordCount);
}
public void pushBack(StreamRecord record) {
if (initTimestamp == -1) {
initTimestamp = (Integer) record.getTuple(0).getField(windowFieldId);
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
}
for (int i = 0; i < record.getNumOfTuples(); ++i) {
while ((Integer) record.getTuple(i).getField(windowFieldId) > nextTimestamp) {
buffer.add(tempRecord);
currentRecordCount += 1;
tempRecord = new StreamRecord(record.getNumOfFields());
}
tempRecord.addTuple(record.getTuple(i));
}
}
public StreamRecord popFront() {
StreamRecord frontRecord = (StreamRecord) buffer.get();
buffer.remove();
return frontRecord;
}
public boolean isFull() {
return currentRecordCount >= fullRecordCount;
}
public boolean isComputable() {
if (currentRecordCount == fullRecordCount + slideRecordCount) {
currentRecordCount -= slideRecordCount;
return true;
}
return false;
}
}
......@@ -18,15 +18,13 @@ package eu.stratosphere.streaming.state;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowStateIterator<K> implements StateIterator<K, StreamRecord>{
public class WindowStateIterator<K>{
@Override
public boolean hasNext() {
// TODO Auto-generated method stub
return false;
}
@Override
public Tuple2<K, StreamRecord> next() {
// TODO Auto-generated method stub
return null;
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.index;
import org.junit.Test;
import eu.stratosphere.streaming.index.BTreeIndex;
import eu.stratosphere.streaming.index.IndexPair;
public class BTreeIndexTest {
@Test
public void bTreeIndexOperationTest(){
BTreeIndex<String, IndexPair> btree=new BTreeIndex<String, IndexPair>();
btree.put("abc", new IndexPair(7, 3));
btree.put("abc", new IndexPair(1, 2));
btree.put("def", new IndexPair(6, 3));
btree.put("ghi", new IndexPair(3, 6));
btree.put("jkl", new IndexPair(4, 7));
System.out.println(btree.get("abc").blockId+", "+btree.get("abc").entryId);
}
}
......@@ -13,37 +13,77 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.wordcount;
package eu.stratosphere.streaming.state;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableInternalState;
public class WordCountKvCounter extends UserTaskInvokable {
private MutableInternalState<String, Integer> wordCounts = new MutableInternalState<String, Integer>();
private String word = "";
private Integer count = 0;
import org.junit.Test;
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Integer>());
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.state.LogTableState;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.TableStateIterator;
import eu.stratosphere.streaming.state.WindowState;
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
} else {
count = 1;
wordCounts.put(word, 1);
public class InternalStateTest {
@Test
public void MutableTableStateTest(){
MutableTableState<String, String> state=new MutableTableState<String, String>();
state.put("abc", "hello");
state.put("test", "world");
state.put("state", "mutable");
state.put("streaming", "persist");
String s=state.get("streaming");
if(s==null){
System.out.println("key does not exist!");
}
outRecord.setString(0, word);
outRecord.setInteger(1, count);
emit(outRecord);
performanceCounter.count();
else{
System.out.println("value="+s);
}
s=state.get("null");
if(s==null){
System.out.println("key does not exist!");
}
else{
System.out.println("value="+s);
}
TableStateIterator<String, String> iterator=state.getIterator();
while(iterator.hasNext()){
Tuple2<String, String> tuple=iterator.next();
System.out.println(tuple.getField(0)+", "+tuple.getField(1));
}
}
@Test
public void LogTableStateTest(){
LogTableState<String, String> state=new LogTableState<String, String>();
state.put("abc", "hello");
state.put("test", "world");
state.put("state", "mutable");
state.put("streaming", "persist");
String s=state.get("streaming");
if(s==null){
System.out.println("key does not exist!");
}
else{
System.out.println("value="+s);
}
s=state.get("null");
if(s==null){
System.out.println("key does not exist!");
}
else{
System.out.println("value="+s);
}
TableStateIterator<String, String> iterator=state.getIterator();
while(iterator.hasNext()){
Tuple2<String, String> tuple=iterator.next();
System.out.println(tuple.getField(0)+", "+tuple.getField(1));
}
}
@Test
public void WindowStateTest(){
WindowState<String> state=new WindowState<String>(100, 20, 10, 2);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册