diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java index 08ded761a15bdaf40ec6fa212fe2c1c75f1a3f2a..05d0ca9632090b4afcde288372dcb09863b32ac3 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/JobGraphBuilder.java @@ -97,6 +97,8 @@ public class JobGraphBuilder { * User defined class describing the source * @param parallelism * Number of task instances of this type to run in parallel + * @param subtasksPerInstance + * Number of subtasks allocated to a machine */ public void setSource(String sourceName, final Class InvokableClass, int parallelism, @@ -128,6 +130,8 @@ public class JobGraphBuilder { * User defined class describing the task * @param parallelism * Number of task instances of this type to run in parallel + * @param subtasksPerInstance + * Number of subtasks allocated to a machine */ public void setTask(String taskName, final Class InvokableClass, int parallelism, int subtasksPerInstance) { @@ -158,6 +162,8 @@ public class JobGraphBuilder { * User defined class describing the sink * @param parallelism * Number of task instances of this type to run in parallel + * @param subtasksPerInstance + * Number of subtasks allocated to a machine */ public void setSink(String sinkName, final Class InvokableClass, int parallelism, int subtasksPerInstance) { @@ -223,7 +229,7 @@ public class JobGraphBuilder { public void setInstanceSharing(String component1, String component2) { AbstractJobVertex c1 = components.get(component1); AbstractJobVertex c2 = components.get(component2); - + c1.setVertexToShareInstancesWith(c2); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java index ebeb81dd3f232524d422d74ea8c947d2fee8e608..b0aff4701bde7205a42a3a571d1cf1badadb21c2 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/api/streamrecord/StreamRecord.java @@ -80,7 +80,6 @@ public class StreamRecord implements IOReadableWritable, Serializable { Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class }; - // TODO implement equals, clone /** * Creates a new empty instance for read */ @@ -97,7 +96,6 @@ public class StreamRecord implements IOReadableWritable, Serializable { this.numOfFields = numOfFields; this.numOfTuples = 0; tupleBatch = new ArrayList(); - } /** @@ -113,7 +111,16 @@ public class StreamRecord implements IOReadableWritable, Serializable { this.numOfTuples = 0; this.batchSize = batchSize; tupleBatch = new ArrayList(batchSize); + } + public StreamRecord(StreamRecord record) { + this.numOfFields = record.getNumOfFields(); + this.numOfTuples = 0; + tupleBatch = new ArrayList(); + this.uid = new UID(Arrays.copyOf(record.getId().getId(), 20)); + for (int i = 0; i < record.getNumOfTuples(); ++i) { + this.tupleBatch.add(copyTuple(record.getTuple(i))); + } } /** @@ -131,7 +138,6 @@ public class StreamRecord implements IOReadableWritable, Serializable { this.batchSize = batchSize; tupleBatch = new ArrayList(batchSize); tupleBatch.add(tuple); - } /** @@ -145,6 +151,10 @@ public class StreamRecord implements IOReadableWritable, Serializable { this(tuple, 1); } + public boolean isEmpty() { + return (this.numOfTuples == 0); + } + /** * @return Number of fields in the tuples */ @@ -499,6 +509,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param o * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setField(int fieldNumber, Object o) throws NoSuchFieldException { setField(0, fieldNumber, o); @@ -514,6 +525,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param o * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ // TODO: consider no such tuple exception and interaction with batch size public void setField(int tupleNumber, int fieldNumber, Object o) throws NoSuchFieldException { @@ -533,6 +545,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param b * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setBoolean(int fieldNumber, Boolean b) throws NoSuchFieldException { setBoolean(0, fieldNumber, b); @@ -549,6 +562,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param b * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setBoolean(int tupleNumber, int fieldNumber, Boolean b) throws NoSuchFieldException { setField(tupleNumber, fieldNumber, b); @@ -562,6 +576,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param b * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setByte(int fieldNumber, Byte b) throws NoSuchFieldException { setByte(0, fieldNumber, b); @@ -577,6 +592,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param b * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setByte(int tupleNumber, int fieldNumber, Byte b) throws NoSuchFieldException { setField(tupleNumber, fieldNumber, b); @@ -591,6 +607,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param c * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setCharacter(int fieldNumber, Character c) throws NoSuchFieldException { setCharacter(0, fieldNumber, c); @@ -607,6 +624,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param c * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setCharacter(int tupleNumber, int fieldNumber, Character c) throws NoSuchFieldException { @@ -621,6 +639,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param d * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setDouble(int fieldNumber, Double d) throws NoSuchFieldException { setDouble(0, fieldNumber, d); @@ -637,6 +656,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param d * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setDouble(int tupleNumber, int fieldNumber, Double d) throws NoSuchFieldException { setField(tupleNumber, fieldNumber, d); @@ -650,6 +670,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param f * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setFloat(int fieldNumber, Float f) throws NoSuchFieldException { setFloat(0, fieldNumber, f); @@ -666,6 +687,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param f * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setFloat(int tupleNumber, int fieldNumber, Float f) throws NoSuchFieldException { setField(tupleNumber, fieldNumber, f); @@ -680,6 +702,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param i * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setInteger(int fieldNumber, Integer i) throws NoSuchFieldException { setInteger(0, fieldNumber, i); @@ -696,6 +719,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param i * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setInteger(int tupleNumber, int fieldNumber, Integer i) throws NoSuchFieldException { setField(tupleNumber, fieldNumber, i); @@ -709,6 +733,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param l * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setLong(int fieldNumber, Long l) throws NoSuchFieldException { setLong(0, fieldNumber, l); @@ -724,6 +749,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param l * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setLong(int tupleNumber, int fieldNumber, Long l) throws NoSuchFieldException { setField(tupleNumber, fieldNumber, l); @@ -737,6 +763,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param s * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setShort(int fieldNumber, Short s) throws NoSuchFieldException { setShort(0, fieldNumber, s); @@ -752,6 +779,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param s * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setShort(int tupleNumber, int fieldNumber, Short s) throws NoSuchFieldException { setField(tupleNumber, fieldNumber, s); @@ -765,6 +793,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param str * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setString(int fieldNumber, String str) throws NoSuchFieldException { setField(0, fieldNumber, str); @@ -781,6 +810,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param str * New value * @throws NoSuchFieldException + * the Tuple does not have this many fields */ public void setString(int tupleNumber, int fieldNumber, String str) throws NoSuchFieldException { setField(tupleNumber, fieldNumber, str); @@ -789,6 +819,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { /** * @return First tuple of the batch * @throws NoSuchTupleException + * the StreamRecord does not have this many tuples */ public Tuple getTuple() throws NoSuchTupleException { return getTuple(0); @@ -799,6 +830,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * Position of the record in the batch * @return Chosen tuple * @throws NoSuchTupleException + * the Tuple does not have this many fields */ public Tuple getTuple(int tupleNumber) throws NoSuchTupleException { try { @@ -855,7 +887,8 @@ public class StreamRecord implements IOReadableWritable, Serializable { * * @param tuple * Tuple to set - * @throws TupleSizeMismatchException + * @throws NoSuchTupleException + * , TupleSizeMismatchException */ public void setTuple(Tuple tuple) throws NoSuchTupleException, TupleSizeMismatchException { setTuple(0, tuple); @@ -890,6 +923,8 @@ public class StreamRecord implements IOReadableWritable, Serializable { * * @param tuple * Tuple to be added as the next record of the batch + * @throws TupleSizeMismatchException + * Tuple specified has illegal size */ public void addTuple(Tuple tuple) throws TupleSizeMismatchException { addTuple(numOfTuples, tuple); @@ -903,6 +938,8 @@ public class StreamRecord implements IOReadableWritable, Serializable { * Position of the added tuple * @param tuple * Tuple to be added as the next record of the batch + * @throws TupleSizeMismatchException + * Tuple specified has illegal size */ public void addTuple(int index, Tuple tuple) throws TupleSizeMismatchException { if (tuple.getArity() == numOfFields) { @@ -920,6 +957,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * Index of tuple to remove * @return Removed tuple * @throws TupleSizeMismatchException + * Tuple specified has illegal size */ public Tuple removeTuple(int index) throws TupleSizeMismatchException { if (index < numOfTuples) { @@ -936,6 +974,7 @@ public class StreamRecord implements IOReadableWritable, Serializable { * * @return copy of the StreamRecord * @throws IOException + * Write or read failed */ public StreamRecord copySerialized() throws IOException { ByteArrayOutputStream buff = new ByteArrayOutputStream(); @@ -972,8 +1011,6 @@ public class StreamRecord implements IOReadableWritable, Serializable { * @param tuple * Tuple to copy * @return Copy of the tuple - * @throws IllegalAccessException - * @throws InstantiationException */ public static Tuple copyTuple(Tuple tuple) { // TODO: implement deep copy for arrays @@ -1028,6 +1065,18 @@ public class StreamRecord implements IOReadableWritable, Serializable { return newTuple; } + /** + * copy tuples from the given record and append them to the end. + * + * @param record + * record to be appended + */ + public void appendRecord(StreamRecord record) { + for (int i = 0; i < record.getNumOfTuples(); ++i) { + this.addTuple(record.getTuple(i)); + } + } + /** * Converts tuple field types to a byte array * diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountCounter.java index 7ca3fe190f3f96edca043350ef04f8906d8c9438..498ab4c315ef3f344e72622d904ba90fcdc174f9 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountCounter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountCounter.java @@ -15,38 +15,45 @@ package eu.stratosphere.streaming.examples.batch.wordcount; -import java.util.HashMap; -import java.util.Map; - +import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.state.MutableTableState; +import eu.stratosphere.streaming.state.MutableTableStateIterator; public class BatchWordCountCounter extends UserTaskInvokable { - private Map wordCounts = new HashMap(); + private MutableTableState wordCounts = new MutableTableState(); private String word = ""; private Integer count = 0; private Long timestamp = 0L; - private StreamRecord outRecord = new StreamRecord(new Tuple3()); + private StreamRecord outRecord = new StreamRecord(3); @Override public void invoke(StreamRecord record) throws Exception { - word = record.getString(0); - timestamp = record.getLong(1); - - if (wordCounts.containsKey(word)) { - count = wordCounts.get(word) + 1; - wordCounts.put(word, count); - } else { - count = 1; - wordCounts.put(word, 1); + int numTuple = record.getNumOfTuples(); + for (int i = 0; i < numTuple; ++i) { + word = record.getString(i, 0); + count = record.getInteger(i, 1); + timestamp = record.getLong(i, 2); + if (wordCounts.containsKey(word)) { + count = wordCounts.get(word) + 1; + wordCounts.put(word, count); + } else { + count = 1; + wordCounts.put(word, 1); + } } - outRecord.setString(0, word); - outRecord.setInteger(1, count); - outRecord.setLong(2, timestamp); - + MutableTableStateIterator iterator = wordCounts + .getIterator(); + while (iterator.hasNext()) { + Tuple2 tuple = iterator.next(); + Tuple3 outputTuple = new Tuple3( + (String) tuple.getField(0), (Integer) tuple.getField(1), timestamp); + outRecord.addTuple(outputTuple); + } emit(outRecord); } } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSink.java index 597b0aaeeb4d49b12b8cf484caa14bec24bfb654..cac944fb86013d00ff298123af6964007356e21b 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSink.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSink.java @@ -26,12 +26,14 @@ public class BatchWordCountSink extends UserSinkInvokable { @Override public void invoke(StreamRecord record) throws Exception { - word = record.getString(0); - count = record.getInteger(1); - timestamp = record.getLong(2); - System.out.println("============================================"); - System.out.println(word + " " + count + " " + timestamp); - System.out.println("============================================"); - + int numTuple = record.getNumOfTuples(); + for (int i = 0; i < numTuple; ++i) { + word = record.getString(i, 0); + count = record.getInteger(i, 1); + timestamp = record.getLong(i, 2); + System.out.println("============================================"); + System.out.println(word + " " + count + " " + timestamp); + System.out.println("============================================"); + } } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSource.java index 549b781b784ee7fc8ac7173bdf60fa6b8670153f..d8b167425c0ec3b9e388b5f5fe373ff1abc3a395 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSource.java @@ -28,9 +28,6 @@ public class BatchWordCountSource extends UserSourceInvokable { private BufferedReader br = null; private String line = ""; private StreamRecord outRecord = new StreamRecord(new Tuple2()); - - private final static int BATCH_SIZE = 20; - private Long timestamp = 0L; public BatchWordCountSource() { @@ -39,29 +36,24 @@ public class BatchWordCountSource extends UserSourceInvokable { } catch (FileNotFoundException e) { e.printStackTrace(); } + timestamp = 0L; } @Override public void invoke() throws Exception { - timestamp = 0L; - outRecord = new StreamRecord(2); - - line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", ""); - - while (line != null) { + for(int i=0; i<100; ++i) { + line = br.readLine(); + if(line==null){ + break; + } if (line != "") { - - outRecord.addTuple(new Tuple2(line, timestamp)); + line=line.replaceAll("[\\-\\+\\.\\^:,]", ""); + System.out.println("line="+line); + outRecord.setString(0, line); + outRecord.setLong(1, timestamp); timestamp++; - if (timestamp % BATCH_SIZE == 0) { - emit(outRecord); - outRecord = new StreamRecord(2); - } + emit(outRecord); } - - line = br.readLine(); - } } - -} \ No newline at end of file +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSplitter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSplitter.java index 7988607991a81a70d39afb9d973fabe465a21ded..6b4331d4b4d87135b6bf8bd84960d8a26c23b2a0 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSplitter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/batch/wordcount/BatchWordCountSplitter.java @@ -15,31 +15,26 @@ package eu.stratosphere.streaming.examples.batch.wordcount; -import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; public class BatchWordCountSplitter extends UserTaskInvokable { - - - private String[] words = new String[] {}; - private StreamRecord outputRecord = new StreamRecord(new Tuple2()); + private StreamRecord outputRecord = new StreamRecord(3); - private Long timestamp =0L; - + private Long timestamp = 0L; @Override public void invoke(StreamRecord record) throws Exception { - int numberOfRecords = record.getNumOfTuples(); - for (int i = 0; i < numberOfRecords; ++i) { - words = record.getString(0).split(" "); - timestamp=record.getLong(1); - for (String word : words) { - outputRecord.setString(0, word); - outputRecord.setLong(1, timestamp); - emit(outputRecord); - } + words = record.getString(0).split(" "); + timestamp = record.getLong(1); + System.out.println("sentence=" + record.getString(0) + ", timestamp=" + + record.getLong(1)); + for (String word : words) { + Tuple3 tuple =new Tuple3(word, 1, timestamp); + outputRecord.addTuple(tuple); } + emit(outputRecord); } } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java index 16b894fb364225154f1d41501d55a0d749960d8c..6f8b95586a87abf504fa9393dfcbd03ee03b0dab 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountCounter.java @@ -15,51 +15,63 @@ package eu.stratosphere.streaming.examples.window.wordcount; +import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; -import eu.stratosphere.streaming.state.MutableInternalState; -import eu.stratosphere.streaming.state.WindowInternalState; +import eu.stratosphere.streaming.state.MutableTableState; +import eu.stratosphere.streaming.state.MutableTableStateIterator; +import eu.stratosphere.streaming.state.WindowState; public class WindowWordCountCounter extends UserTaskInvokable { private int windowSize; private int slidingStep; + private int computeGranularity; + private int windowFieldId; - private WindowInternalState window; - private MutableInternalState wordCounts; + private WindowState window; + private MutableTableState wordCounts; private String word = ""; private Integer count = 0; private Long timestamp = 0L; - private StreamRecord outRecord = new StreamRecord( - new Tuple3()); + private StreamRecord outRecord = new StreamRecord(3); public WindowWordCountCounter() { windowSize = 100; slidingStep = 20; - window = new WindowInternalState(windowSize, slidingStep); - wordCounts = new MutableInternalState(); + computeGranularity = 10; + windowFieldId = 2; + window = new WindowState(windowSize, slidingStep, + computeGranularity, windowFieldId); + wordCounts = new MutableTableState(); } private void incrementCompute(StreamRecord record) { - word = record.getString(0); - if (wordCounts.containsKey(word)) { - count = wordCounts.get(word) + 1; - wordCounts.put(word, count); - } else { - count = 1; - wordCounts.put(word, 1); + int numTuple = record.getNumOfTuples(); + for (int i = 0; i < numTuple; ++i) { + word = record.getString(i, 0); + if (wordCounts.containsKey(word)) { + count = wordCounts.get(word) + 1; + wordCounts.put(word, count); + } else { + count = 1; + wordCounts.put(word, 1); + } } } private void decrementCompute(StreamRecord record) { - word = record.getString(0); - count = wordCounts.get(word) - 1; - if (count == 0) { - wordCounts.delete(word); - } else { - wordCounts.put(word, count); + int numTuple = record.getNumOfTuples(); + for (int i = 0; i < numTuple; ++i) { + word = record.getString(i, 0); + count = wordCounts.get(word) - 1; + if (count == 0) { + wordCounts.delete(word); + } else { + wordCounts.put(word, count); + } } } @@ -71,18 +83,28 @@ public class WindowWordCountCounter extends UserTaskInvokable { decrementCompute(expiredRecord); window.pushBack(record); if (window.isComputable()) { - outRecord.setString(0, word); - outRecord.setInteger(1, count); - outRecord.setLong(2, timestamp); + MutableTableStateIterator iterator = wordCounts + .getIterator(); + while (iterator.hasNext()) { + Tuple2 tuple = iterator.next(); + Tuple3 outputTuple = new Tuple3( + (String) tuple.getField(0), (Integer) tuple.getField(1), timestamp); + outRecord.addTuple(outputTuple); + } emit(outRecord); } } else { incrementCompute(record); window.pushBack(record); - if(window.isFull()){ - outRecord.setString(0, word); - outRecord.setInteger(1, count); - outRecord.setLong(2, timestamp); + if (window.isFull()) { + MutableTableStateIterator iterator = wordCounts + .getIterator(); + while (iterator.hasNext()) { + Tuple2 tuple = iterator.next(); + Tuple3 outputTuple = new Tuple3( + (String) tuple.getField(0), (Integer) tuple.getField(1), timestamp); + outRecord.addTuple(outputTuple); + } emit(outRecord); } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSink.java index 40ccf76b430b793831fa1a08301612bce5a19be2..4e4977f54eb0156fc8a0ef0b5d659d3145a4b40c 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSink.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSink.java @@ -26,12 +26,14 @@ public class WindowWordCountSink extends UserSinkInvokable { @Override public void invoke(StreamRecord record) throws Exception { - word = record.getString(0); - count = record.getInteger(1); - timestamp = record.getLong(2); - System.out.println("============================================"); - System.out.println(word + " " + count + " " + timestamp); - System.out.println("============================================"); - + int numTuple = record.getNumOfTuples(); + for (int i = 0; i < numTuple; ++i) { + word = record.getString(i, 0); + count = record.getInteger(i, 1); + timestamp = record.getLong(i, 2); + System.out.println("============================================"); + System.out.println(word + " " + count + " " + timestamp); + System.out.println("============================================"); + } } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSource.java index 99c3be2674244b431933fa604ca43520fdd710b6..952656944f2c5ca90686c2d0274c623f1c9ae41b 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSource.java @@ -26,10 +26,9 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord; public class WindowWordCountSource extends UserSourceInvokable { private BufferedReader br = null; - private String line = new String(); + private String line = ""; private StreamRecord outRecord = new StreamRecord(new Tuple2()); - - private Long timestamp; + private Long timestamp = 0L; public WindowWordCountSource() { try { @@ -37,20 +36,24 @@ public class WindowWordCountSource extends UserSourceInvokable { } catch (FileNotFoundException e) { e.printStackTrace(); } + timestamp = 0L; } @Override public void invoke() throws Exception { - timestamp = 0L; - line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", ""); - while (line != null) { + for(int i=0; i<10; ++i) { + line = br.readLine(); + if(line==null){ + break; + } if (line != "") { + line=line.replaceAll("[\\-\\+\\.\\^:,]", ""); + System.out.println("line="+line); outRecord.setString(0, line); outRecord.setLong(1, timestamp); + timestamp++; emit(outRecord); } - line = br.readLine(); - timestamp++; } } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java index 32aaa1374750095f41d28973bbf03b9d5b2df665..a2c80c54d2bf70b0456c7cac082ae2fefd3c7494 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/window/wordcount/WindowWordCountSplitter.java @@ -15,14 +15,13 @@ package eu.stratosphere.streaming.examples.window.wordcount; -import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.api.java.tuple.Tuple3; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; public class WindowWordCountSplitter extends UserTaskInvokable { - private String[] words = new String[] {}; - private StreamRecord outputRecord = new StreamRecord(new Tuple2()); + private StreamRecord outputRecord = new StreamRecord(3); private Long timestamp = 0L; @@ -30,13 +29,12 @@ public class WindowWordCountSplitter extends UserTaskInvokable { public void invoke(StreamRecord record) throws Exception { words = record.getString(0).split(" "); timestamp = record.getLong(1); - System.out.println("************sentence=" + words + ", timestamp=" + timestamp - + "************"); + System.out.println("sentence=" + record.getString(0) + ", timestamp=" + + record.getLong(1)); for (String word : words) { - outputRecord.setString(0, word); - outputRecord.setLong(1, timestamp); - emit(outputRecord); + Tuple3 tuple =new Tuple3(word, 1, timestamp); + outputRecord.addTuple(tuple); } - + emit(outputRecord); } } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java index b66f7755cfdac127f3e25c65d564f62f4a60ac83..c763274d0436385a8e89f6975cec9e3be2d706c1 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java @@ -15,16 +15,14 @@ package eu.stratosphere.streaming.examples.wordcount; -import java.util.HashMap; -import java.util.Map; - import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.state.MutableTableState; public class WordCountCounter extends UserTaskInvokable { - private Map wordCounts = new HashMap(); + private MutableTableState wordCounts = new MutableTableState(); private String word = ""; private Integer count = 0; diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountKvCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountKvCounter.java deleted file mode 100644 index cc65c327c8355c4ebfd7aab5e4e34eda8152c899..0000000000000000000000000000000000000000 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountKvCounter.java +++ /dev/null @@ -1,49 +0,0 @@ -/*********************************************************************************************************************** - * - * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - **********************************************************************************************************************/ - -package eu.stratosphere.streaming.examples.wordcount; - -import eu.stratosphere.api.java.tuple.Tuple2; -import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; -import eu.stratosphere.streaming.api.streamrecord.StreamRecord; -import eu.stratosphere.streaming.state.MutableInternalState; - -public class WordCountKvCounter extends UserTaskInvokable { - - private MutableInternalState wordCounts = new MutableInternalState(); - private String word = ""; - private Integer count = 0; - - private StreamRecord outRecord = new StreamRecord(new Tuple2()); - - @Override - public void invoke(StreamRecord record) throws Exception { - word = record.getString(0); - - if (wordCounts.containsKey(word)) { - count = wordCounts.get(word) + 1; - wordCounts.put(word, count); - } else { - count = 1; - wordCounts.put(word, 1); - } - - outRecord.setString(0, word); - outRecord.setInteger(1, count); - - emit(outRecord); - performanceCounter.count(); - } -} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java index 7d023a6aed8523fe755a6c80c19fb47bf1fa1959..78a590c9e3c148b5691d23608fd9e526017773b4 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountLocal.java @@ -28,16 +28,11 @@ import eu.stratosphere.streaming.util.LogUtils; public class WordCountLocal { - private static JobGraph getJobGraph(int sourceSubtasks, int sourceSubtasksPerInstance, - int counterSubtasks, int counterSubtasksPerInstance, int sinkSubtasks, - int sinkSubtasksPerInstance) throws Exception { + public static JobGraph getJobGraph() { JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); - graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class, - sourceSubtasks, sourceSubtasksPerInstance); - graphBuilder.setTask("WordCountCounter", WordCountCounter.class, counterSubtasks, - counterSubtasksPerInstance); - graphBuilder.setSink("WordCountSink", WordCountSink.class, sinkSubtasks, - sinkSubtasksPerInstance); + graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class); + graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 1, 1); + graphBuilder.setSink("WordCountSink", WordCountSink.class); graphBuilder.fieldsConnect("WordCountSourceSplitter", "WordCountCounter", 0); graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink"); @@ -45,71 +40,44 @@ public class WordCountLocal { return graphBuilder.getJobGraph(); } - private static void wrongArgs() { - System.out - .println("USAGE:\n" - + "run "); - } - - // TODO: arguments check public static void main(String[] args) { - if (args.length != 7) { - wrongArgs(); - } else { - LogUtils.initializeDefaultConsoleLogger(Level.ERROR, Level.INFO); - - int sourceSubtasks = 1; - int sourceSubtasksPerInstance = 1; - int counterSubtasks = 1; - int counterSubtasksPerInstance = 1; - int sinkSubtasks = 1; - int sinkSubtasksPerInstance = 1; - - try { - sourceSubtasks = Integer.parseInt(args[1]); - sourceSubtasksPerInstance = Integer.parseInt(args[2]); - counterSubtasks = Integer.parseInt(args[3]); - counterSubtasksPerInstance = Integer.parseInt(args[4]); - sinkSubtasks = Integer.parseInt(args[5]); - sinkSubtasksPerInstance = Integer.parseInt(args[6]); - } catch (Exception e) { - wrongArgs(); + LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO); + + try { + + JobGraph jG = getJobGraph(); + Configuration configuration = jG.getJobConfiguration(); + + if (args.length == 0) { + args = new String[] { "local" }; } - try { - JobGraph jG = getJobGraph(sourceSubtasks, sourceSubtasksPerInstance, - counterSubtasks, counterSubtasksPerInstance, sinkSubtasks, - sinkSubtasksPerInstance); - Configuration configuration = jG.getJobConfiguration(); + if (args[0].equals("local")) { + System.out.println("Running in Local mode"); + NepheleMiniCluster exec = new NepheleMiniCluster(); - if (args.length == 0) { - args = new String[] { "local" }; - } + exec.start(); - if (args[0].equals("local")) { - System.out.println("Running in Local mode"); - NepheleMiniCluster exec = new NepheleMiniCluster(); + Client client = new Client(new InetSocketAddress("localhost", 6498), configuration); - exec.start(); + client.run(jG, true); - Client client = new Client(new InetSocketAddress("localhost", 6498), - configuration); + exec.stop(); - client.run(jG, true); + } else if (args[0].equals("cluster")) { + System.out.println("Running in Cluster2 mode"); - exec.stop(); - } else if (args[0].equals("cluster")) { - System.out.println("Running in Cluster mode"); + Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123), + configuration); - Client client = new Client(new InetSocketAddress("dell150", 6123), - configuration); - client.run(jG, true); - } + client.run(jG, true); - } catch (Exception e) { - System.out.println(e); } + + } catch (Exception e) { + System.out.println(e); } + } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java index 263d80e28f460b16cbc698e5e7b0f1c77f64d5bd..c61db134035b764e6e578fc15b9e5ffe0090c562 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java @@ -16,7 +16,6 @@ package eu.stratosphere.streaming.examples.wordcount; import java.io.BufferedReader; -import java.io.FileNotFoundException; import java.io.FileReader; import eu.stratosphere.api.java.tuple.Tuple1; @@ -31,27 +30,20 @@ public class WordCountSource extends UserSourceInvokable { @Override public void invoke() throws Exception { - - for (int i = 0; i < 2; i++) { - try { - br = new BufferedReader(new FileReader( - "/home/strato/stratosphere-distrib/resources/hamlet.txt")); - - line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", ""); - while (line != null) { - if (line != "") { - outRecord.setString(0, line); - emit(outRecord); - performanceCounter.count(); - } - line = br.readLine(); - } - - } catch (FileNotFoundException e) { - e.printStackTrace(); + br = new BufferedReader(new FileReader( + "src/test/resources/testdata/hamlet.txt")); + while (true) { + line = br.readLine(); + if (line == null) { + break; } + if (line != "") { + line=line.replaceAll("[\\-\\+\\.\\^:,]", ""); + outRecord.setString(0, line); + emit(outRecord); + performanceCounter.count(); + } + line = br.readLine(); } - } - } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSourceSplitter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSourceSplitter.java old mode 100755 new mode 100644 index 534db04ee77af5d9c41f20688a9552d45c97f74b..9c5aa3e90371d6899920ea476619f47a9b2b686b --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSourceSplitter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSourceSplitter.java @@ -16,7 +16,6 @@ package eu.stratosphere.streaming.examples.wordcount; import java.io.BufferedReader; -import java.io.FileNotFoundException; import java.io.FileReader; import eu.stratosphere.api.java.tuple.Tuple1; @@ -31,28 +30,22 @@ public class WordCountSourceSplitter extends UserSourceInvokable { @Override public void invoke() throws Exception { - + br = new BufferedReader(new FileReader( + "src/test/resources/testdata/hamlet.txt")); while (true) { - try { - br = new BufferedReader(new FileReader( - "/home/strato/stratosphere-distrib/resources/hamlet.txt")); - line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", ""); - while (line != null) { - if (line != "") { - for (String word : line.split(" ")) { - outRecord.setString(0, word); - emit(outRecord); - performanceCounter.count(); - } - } - line = br.readLine(); + line = br.readLine(); + if (line == null) { + break; + } + if (line != "") { + line=line.replaceAll("[\\-\\+\\.\\^:,]", ""); + for (String word : line.split(" ")) { + outRecord.setString(0, word); + System.out.println("word=" + word); + emit(outRecord); + performanceCounter.count(); } - } catch (FileNotFoundException e) { - e.printStackTrace(); } - } - } - } \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountStarter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountStarter.java new file mode 100644 index 0000000000000000000000000000000000000000..4002701dad3f0ee8526242094c28f6f2ec33671d --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountStarter.java @@ -0,0 +1,115 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.examples.wordcount; + +import java.net.InetSocketAddress; + +import org.apache.log4j.Level; + +import eu.stratosphere.client.minicluster.NepheleMiniCluster; +import eu.stratosphere.client.program.Client; +import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.nephele.jobgraph.JobGraph; +import eu.stratosphere.streaming.api.JobGraphBuilder; +import eu.stratosphere.streaming.util.LogUtils; + +public class WordCountStarter { + + private static JobGraph getJobGraph(int sourceSubtasks, int sourceSubtasksPerInstance, + int counterSubtasks, int counterSubtasksPerInstance, int sinkSubtasks, + int sinkSubtasksPerInstance) throws Exception { + JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph"); + graphBuilder.setSource("WordCountSourceSplitter", WordCountSourceSplitter.class, + sourceSubtasks, sourceSubtasksPerInstance); + graphBuilder.setTask("WordCountCounter", WordCountCounter.class, counterSubtasks, + counterSubtasksPerInstance); + graphBuilder.setSink("WordCountSink", WordCountSink.class, sinkSubtasks, + sinkSubtasksPerInstance); + + graphBuilder.fieldsConnect("WordCountSourceSplitter", "WordCountCounter", 0); + graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink"); + + return graphBuilder.getJobGraph(); + } + + private static void wrongArgs() { + System.out + .println("USAGE:\n" + + "run "); + } + + // TODO: arguments check + public static void main(String[] args) { + + if (args.length != 7) { + wrongArgs(); + } else { + LogUtils.initializeDefaultConsoleLogger(Level.ERROR, Level.INFO); + + int sourceSubtasks = 1; + int sourceSubtasksPerInstance = 1; + int counterSubtasks = 1; + int counterSubtasksPerInstance = 1; + int sinkSubtasks = 1; + int sinkSubtasksPerInstance = 1; + + try { + sourceSubtasks = Integer.parseInt(args[1]); + sourceSubtasksPerInstance = Integer.parseInt(args[2]); + counterSubtasks = Integer.parseInt(args[3]); + counterSubtasksPerInstance = Integer.parseInt(args[4]); + sinkSubtasks = Integer.parseInt(args[5]); + sinkSubtasksPerInstance = Integer.parseInt(args[6]); + } catch (Exception e) { + wrongArgs(); + } + + try { + JobGraph jG = getJobGraph(sourceSubtasks, sourceSubtasksPerInstance, + counterSubtasks, counterSubtasksPerInstance, sinkSubtasks, + sinkSubtasksPerInstance); + Configuration configuration = jG.getJobConfiguration(); + + if (args.length == 0) { + args = new String[] { "local" }; + } + + if (args[0].equals("local")) { + System.out.println("Running in Local mode"); + NepheleMiniCluster exec = new NepheleMiniCluster(); + + exec.start(); + + Client client = new Client(new InetSocketAddress("localhost", 6498), + configuration); + + client.run(jG, true); + + exec.stop(); + } else if (args[0].equals("cluster")) { + System.out.println("Running in Cluster mode"); + + Client client = new Client(new InetSocketAddress("dell150", 6123), + configuration); + client.run(jG, true); + } + + } catch (Exception e) { + System.out.println(e); + } + } + } +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/index/BTreeIndex.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/index/BTreeIndex.java new file mode 100644 index 0000000000000000000000000000000000000000..38703669de5cac8ff04e074e71ebf5220a4247de --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/index/BTreeIndex.java @@ -0,0 +1,170 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.index; + +/** + * An easy open-sourced implementation of B-tree. + * Currently this implementation does not support duplicated key insert. + * This file would be reimplemented and optimized for in-memory state. + * Source code website: http://algs4.cs.princeton.edu/62btrees/BTree.java + */ +public class BTreeIndex, Value> { + private static final int M = 4; // max children per B-tree node = M-1 + + private Node root; // root of the B-tree + private int HT; // height of the B-tree + private int N; // number of key-value pairs in the B-tree + + // helper B-tree node data type + private static final class Node { + private int m; // number of children + private Entry[] children = new Entry[M]; // the array of children + private Node(int k) { m = k; } // create a node with k children + } + + // internal nodes: only use key and next + // external nodes: only use key and value + private static class Entry { + private Comparable key; + private Object value; + private Node next; // helper field to iterate over array entries + public Entry(Comparable key, Object value, Node next) { + this.key = key; + this.value = value; + this.next = next; + } + } + + // constructor + public BTreeIndex() { root = new Node(0); } + + // return number of key-value pairs in the B-tree + public int size() { return N; } + + // return height of B-tree + public int height() { return HT; } + + + // search for given key, return associated value; return null if no such key + public Value get(Key key) { return search(root, key, HT); } + private Value search(Node x, Key key, int ht) { + Entry[] children = x.children; + + // external node + if (ht == 0) { + for (int j = 0; j < x.m; j++) { + if (eq(key, children[j].key)) return (Value) children[j].value; + } + } + + // internal node + else { + for (int j = 0; j < x.m; j++) { + if (j+1 == x.m || less(key, children[j+1].key)) + return search(children[j].next, key, ht-1); + } + } + return null; + } + + + // insert key-value pair + // add code to check for duplicate keys + public void put(Key key, Value value) { + Node u = insert(root, key, value, HT); + N++; + if (u == null) return; + + // need to split root + Node t = new Node(2); + t.children[0] = new Entry(root.children[0].key, null, root); + t.children[1] = new Entry(u.children[0].key, null, u); + root = t; + HT++; + } + + + private Node insert(Node h, Key key, Value value, int ht) { + int j; + Entry t = new Entry(key, value, null); + + // external node + if (ht == 0) { + for (j = 0; j < h.m; j++) { + if (less(key, h.children[j].key)) break; + } + } + + // internal node + else { + for (j = 0; j < h.m; j++) { + if ((j+1 == h.m) || less(key, h.children[j+1].key)) { + Node u = insert(h.children[j++].next, key, value, ht-1); + if (u == null) return null; + t.key = u.children[0].key; + t.next = u; + break; + } + } + } + + for (int i = h.m; i > j; i--) h.children[i] = h.children[i-1]; + h.children[j] = t; + h.m++; + if (h.m < M) return null; + else return split(h); + } + + // split node in half + private Node split(Node h) { + Node t = new Node(M/2); + h.m = M/2; + for (int j = 0; j < M/2; j++) + t.children[j] = h.children[M/2+j]; + return t; + } + + // for debugging + public String toString() { + return toString(root, HT, "") + "\n"; + } + private String toString(Node h, int ht, String indent) { + String s = ""; + Entry[] children = h.children; + + if (ht == 0) { + for (int j = 0; j < h.m; j++) { + s += indent + children[j].key + " " + children[j].value + "\n"; + } + } + else { + for (int j = 0; j < h.m; j++) { + if (j > 0) s += indent + "(" + children[j].key + ")\n"; + s += toString(children[j].next, ht-1, indent + " "); + } + } + return s; + } + + // comparison functions - make Comparable instead of Key to avoid casts + private boolean less(Comparable k1, Comparable k2) { + return k1.compareTo(k2) < 0; + } + + private boolean eq(Comparable k1, Comparable k2) { + return k1.compareTo(k2) == 0; + } +} \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/index/IndexPair.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/index/IndexPair.java new file mode 100644 index 0000000000000000000000000000000000000000..839e28d9347b93ea68544a0aef15ee02553c254f --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/index/IndexPair.java @@ -0,0 +1,41 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.index; + +public class IndexPair{ + public IndexPair(int block, int entry){ + blockId=block; + entryId=entry; + } + + public IndexPair(IndexPair pair){ + blockId=pair.blockId; + entryId=pair.entryId; + } + + public void setIndexPair(int block, int entry){ + blockId=block; + entryId=entry; + } + + public void IncrementBlock(){ + blockId=blockId+1; + entryId=0; + } + + public int blockId; + public int entryId; +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/LogTableState.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/LogTableState.java new file mode 100644 index 0000000000000000000000000000000000000000..b647e48882d765505504f01be4852a1952a694dd --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/LogTableState.java @@ -0,0 +1,92 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.state; + +import java.util.ArrayList; +import java.util.HashMap; + +import eu.stratosphere.streaming.index.IndexPair; + +/** + * The log-structured key value store thats accept any modification operation by + * appending the value to the end of the state. + */ +public class LogTableState implements TableState { + + private HashMap hashMap = new HashMap(); + private HashMap> blockList = new HashMap>(); + private final int perBlockEntryCount = 1000; + private IndexPair nextInsertPos = new IndexPair(-1, -1); + + public LogTableState() { + blockList.put(0, new ArrayList()); + nextInsertPos.setIndexPair(0, 0); + } + + @Override + public void put(K key, V value) { + // TODO Auto-generated method stub + if (nextInsertPos.entryId == perBlockEntryCount) { + blockList.put(nextInsertPos.blockId + 1, new ArrayList()); + nextInsertPos.IncrementBlock(); + } + blockList.get(nextInsertPos.blockId).add(value); + hashMap.put(key, new IndexPair(nextInsertPos)); + nextInsertPos.entryId += 1; + } + + @Override + public V get(K key) { + // TODO Auto-generated method stub + IndexPair index = hashMap.get(key); + if (index == null) { + return null; + } else { + return blockList.get(index.blockId).get(index.entryId); + } + } + + @Override + public void delete(K key) { + // TODO Auto-generated method stub + hashMap.remove(key); + } + + @Override + public boolean containsKey(K key) { + // TODO Auto-generated method stub + return hashMap.containsKey(key); + } + + @Override + public String serialize() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void deserialize(String str) { + // TODO Auto-generated method stub + + } + + @Override + public TableStateIterator getIterator() { + // TODO Auto-generated method stub + return new LogTableStateIterator(hashMap.entrySet().iterator(), blockList); + } + +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/LogTableStateIterator.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/LogTableStateIterator.java new file mode 100644 index 0000000000000000000000000000000000000000..0d97061a34c5b96f7e183fa22af1707363dd66d1 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/LogTableStateIterator.java @@ -0,0 +1,46 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.state; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map.Entry; + +import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.streaming.index.IndexPair; + +public class LogTableStateIterator implements TableStateIterator{ + + private Iterator> iterator; + private HashMap> blockList; + public LogTableStateIterator(Iterator> iter, HashMap> blocks){ + iterator=iter; + blockList=blocks; + } + @Override + public boolean hasNext() { + // TODO Auto-generated method stub + return false; + } + + @Override + public Tuple2 next() { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableInternalState.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableTableState.java similarity index 80% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableInternalState.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableTableState.java index 8ef09e452a701904937d37824ba4fb918a979335..93bccbfac4306f11c2e35cd0782d0c5326a14280 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableInternalState.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableTableState.java @@ -21,7 +21,7 @@ import java.util.Map; /** * The most general internal state that stores data in a mutable map. */ -public class MutableInternalState implements InternalState { +public class MutableTableState implements TableState { private Map state=new LinkedHashMap(); @Override @@ -49,9 +49,21 @@ public class MutableInternalState implements InternalState { } @Override - public StateIterator getIterator() { + public MutableTableStateIterator getIterator() { // TODO Auto-generated method stub - return new MutableStateIterator(state.entrySet().iterator()); + return new MutableTableStateIterator(state.entrySet().iterator()); + } + + @Override + public String serialize() { + // TODO Auto-generated method stub + return null; + } + + @Override + public void deserialize(String str) { + // TODO Auto-generated method stub + } } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableStateIterator.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableTableStateIterator.java similarity index 90% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableStateIterator.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableTableStateIterator.java index b7efc0476ccb6193d55d88678e40a7cb01fe191d..e9c4d35567b5c871c6e09e12d9040dc191b0c630 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableStateIterator.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/MutableTableStateIterator.java @@ -20,10 +20,10 @@ import java.util.Map.Entry; import eu.stratosphere.api.java.tuple.Tuple2; -public class MutableStateIterator implements StateIterator{ +public class MutableTableStateIterator implements TableStateIterator{ private Iterator> iterator; - public MutableStateIterator(Iterator> iter){ + public MutableTableStateIterator(Iterator> iter){ iterator=iter; } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/InternalState.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/TableState.java similarity index 88% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/InternalState.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/TableState.java index 077d15388ee2ca9d9ecf47a2a84513ebe5b0cbd0..1cf5a75257c4623fe73adf730a99389780be250e 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/InternalState.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/TableState.java @@ -18,10 +18,12 @@ package eu.stratosphere.streaming.state; /** * An internal state interface that supports stateful operator. */ -public interface InternalState { +public interface TableState { public void put(K key, V value); public V get(K key); public void delete(K key); public boolean containsKey(K key); - public StateIterator getIterator(); + public String serialize(); + public void deserialize(String str); + public TableStateIterator getIterator(); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/StateIterator.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/TableStateIterator.java similarity index 96% rename from flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/StateIterator.java rename to flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/TableStateIterator.java index d098a65dc338d93070dcfb52f226276023f0b27d..8bac9b74f93f8210de92600459e0095d94867acc 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/StateIterator.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/TableStateIterator.java @@ -20,7 +20,7 @@ import eu.stratosphere.api.java.tuple.Tuple2; /** * the iterator for internal states. */ -public interface StateIterator{ +public interface TableStateIterator{ public boolean hasNext(); public Tuple2 next(); } diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowInternalState.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowInternalState.java deleted file mode 100644 index 09d5ccbdf4a4c863c49f420e8afe9a10c7962c38..0000000000000000000000000000000000000000 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowInternalState.java +++ /dev/null @@ -1,94 +0,0 @@ -/*********************************************************************************************************************** - * - * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - **********************************************************************************************************************/ - -package eu.stratosphere.streaming.state; - -import org.apache.commons.collections.buffer.CircularFifoBuffer; - -import eu.stratosphere.streaming.api.streamrecord.StreamRecord; - -/** - * The window state for window operator. To be general enough, this class - * implements a count based window operator. It is possible for the user to - * compose time based window operator by extending this class by splitting the - * stream into multiple mini batches. - */ -public class WindowInternalState implements InternalState { - private int currentRecordNum; - private int fullRecordNum; - private int slideRecordNum; - CircularFifoBuffer buffer; - - public WindowInternalState(int windowSize, int slidingStep) { - currentRecordNum = 0; - fullRecordNum = windowSize; - slideRecordNum = slidingStep; - buffer = new CircularFifoBuffer(windowSize); - } - - public void pushBack(StreamRecord records) { - buffer.add(records); - currentRecordNum += 1; - } - - public StreamRecord popFront() { - StreamRecord frontRecord=(StreamRecord) buffer.get(); - buffer.remove(); - return frontRecord; - } - - public boolean isFull() { - return currentRecordNum >= fullRecordNum; - } - - public boolean isComputable() { - if (currentRecordNum == fullRecordNum + slideRecordNum) { - currentRecordNum -= slideRecordNum; - return true; - } - return false; - } - - @Override - public void put(K key, StreamRecord value) { - // TODO Auto-generated method stub - - } - - @Override - public StreamRecord get(K key) { - // TODO Auto-generated method stub - return null; - } - - @Override - public void delete(K key) { - // TODO Auto-generated method stub - - } - - @Override - public boolean containsKey(K key) { - // TODO Auto-generated method stub - return false; - } - - @Override - public StateIterator getIterator() { - // TODO Auto-generated method stub - return null; - } - -} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowState.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowState.java new file mode 100644 index 0000000000000000000000000000000000000000..732a0ffb6ab733f2e6fa396c5c1d081bec5a2c34 --- /dev/null +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowState.java @@ -0,0 +1,100 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.state; + +import java.util.HashMap; + +import org.apache.commons.collections.buffer.CircularFifoBuffer; + +import eu.stratosphere.streaming.api.streamrecord.StreamRecord; +import eu.stratosphere.streaming.index.IndexPair; + +/** + * The window state for window operator. To be general enough, this class + * implements a count based window operator. It is possible for the user to + * compose time based window operator by extending this class by splitting the + * stream into multiple mini batches. + */ +public class WindowState { + private int windowSize; + private int slidingStep; + private int computeGranularity; + private int windowFieldId; + + private int initTimestamp; + private int nextTimestamp; + private int currentRecordCount; + private int fullRecordCount; + private int slideRecordCount; + + HashMap windowIndex; + CircularFifoBuffer buffer; + StreamRecord tempRecord; + + public WindowState(int windowSize, int slidingStep, int computeGranularity, + int windowFieldId) { + this.windowSize = windowSize; + this.slidingStep = slidingStep; + this.computeGranularity = computeGranularity; + this.windowFieldId = windowFieldId; + + this.initTimestamp = -1; + this.nextTimestamp = -1; + this.currentRecordCount = 0; + // here we assume that windowSize and slidingStep is divisible by + // computeGranularity. + this.fullRecordCount = windowSize / computeGranularity; + this.slideRecordCount = slidingStep / computeGranularity; + + this.windowIndex = new HashMap(); + this.buffer = new CircularFifoBuffer(fullRecordCount); + } + + public void pushBack(StreamRecord record) { + if (initTimestamp == -1) { + initTimestamp = (Integer) record.getTuple(0).getField(windowFieldId); + nextTimestamp = initTimestamp + computeGranularity; + tempRecord = new StreamRecord(record.getNumOfFields()); + } + for (int i = 0; i < record.getNumOfTuples(); ++i) { + while ((Integer) record.getTuple(i).getField(windowFieldId) > nextTimestamp) { + buffer.add(tempRecord); + currentRecordCount += 1; + tempRecord = new StreamRecord(record.getNumOfFields()); + } + tempRecord.addTuple(record.getTuple(i)); + } + } + + public StreamRecord popFront() { + StreamRecord frontRecord = (StreamRecord) buffer.get(); + buffer.remove(); + return frontRecord; + } + + public boolean isFull() { + return currentRecordCount >= fullRecordCount; + } + + public boolean isComputable() { + if (currentRecordCount == fullRecordCount + slideRecordCount) { + currentRecordCount -= slideRecordCount; + return true; + } + return false; + } + +} diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowStateIterator.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowStateIterator.java index f1ad7927adad368422d658adefffcdb8848024d4..b43a17a992f78ad32c60828cd318427826e46db7 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowStateIterator.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/state/WindowStateIterator.java @@ -18,15 +18,13 @@ package eu.stratosphere.streaming.state; import eu.stratosphere.api.java.tuple.Tuple2; import eu.stratosphere.streaming.api.streamrecord.StreamRecord; -public class WindowStateIterator implements StateIterator{ +public class WindowStateIterator{ - @Override public boolean hasNext() { // TODO Auto-generated method stub return false; } - @Override public Tuple2 next() { // TODO Auto-generated method stub return null; diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/index/BTreeIndexTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/index/BTreeIndexTest.java new file mode 100644 index 0000000000000000000000000000000000000000..f361af31e7043d64bd09cb47129d32cde6b05ffb --- /dev/null +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/index/BTreeIndexTest.java @@ -0,0 +1,35 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.index; + +import org.junit.Test; + +import eu.stratosphere.streaming.index.BTreeIndex; +import eu.stratosphere.streaming.index.IndexPair; + +public class BTreeIndexTest { + + @Test + public void bTreeIndexOperationTest(){ + BTreeIndex btree=new BTreeIndex(); + btree.put("abc", new IndexPair(7, 3)); + btree.put("abc", new IndexPair(1, 2)); + btree.put("def", new IndexPair(6, 3)); + btree.put("ghi", new IndexPair(3, 6)); + btree.put("jkl", new IndexPair(4, 7)); + System.out.println(btree.get("abc").blockId+", "+btree.get("abc").entryId); + } +} diff --git a/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/state/InternalStateTest.java b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/state/InternalStateTest.java new file mode 100644 index 0000000000000000000000000000000000000000..dd858c3f760abe71cf17dd983503d01752eb5841 --- /dev/null +++ b/flink-addons/flink-streaming/src/test/java/eu/stratosphere/streaming/state/InternalStateTest.java @@ -0,0 +1,89 @@ +/*********************************************************************************************************************** + * + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + **********************************************************************************************************************/ + +package eu.stratosphere.streaming.state; + +import org.junit.Test; + +import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.streaming.state.LogTableState; +import eu.stratosphere.streaming.state.MutableTableState; +import eu.stratosphere.streaming.state.TableStateIterator; +import eu.stratosphere.streaming.state.WindowState; + +public class InternalStateTest { + + @Test + public void MutableTableStateTest(){ + MutableTableState state=new MutableTableState(); + state.put("abc", "hello"); + state.put("test", "world"); + state.put("state", "mutable"); + state.put("streaming", "persist"); + String s=state.get("streaming"); + if(s==null){ + System.out.println("key does not exist!"); + } + else{ + System.out.println("value="+s); + } + s=state.get("null"); + if(s==null){ + System.out.println("key does not exist!"); + } + else{ + System.out.println("value="+s); + } + TableStateIterator iterator=state.getIterator(); + while(iterator.hasNext()){ + Tuple2 tuple=iterator.next(); + System.out.println(tuple.getField(0)+", "+tuple.getField(1)); + } + } + + @Test + public void LogTableStateTest(){ + LogTableState state=new LogTableState(); + state.put("abc", "hello"); + state.put("test", "world"); + state.put("state", "mutable"); + state.put("streaming", "persist"); + String s=state.get("streaming"); + if(s==null){ + System.out.println("key does not exist!"); + } + else{ + System.out.println("value="+s); + } + s=state.get("null"); + if(s==null){ + System.out.println("key does not exist!"); + } + else{ + System.out.println("value="+s); + } + TableStateIterator iterator=state.getIterator(); + while(iterator.hasNext()){ + Tuple2 tuple=iterator.next(); + System.out.println(tuple.getField(0)+", "+tuple.getField(1)); + } + } + + @Test + public void WindowStateTest(){ + WindowState state=new WindowState(100, 20, 10, 2); + + } +}