提交 cf27df66 编写于 作者: Y Yingjun Wu 提交者: Stephan Ewen

[streaming] update internal state abstraction, add index.

上级 bf335359
......@@ -15,19 +15,20 @@
package eu.stratosphere.streaming.examples.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.util.PerformanceCounter;
public class WordCountCounter extends UserTaskInvokable {
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private MutableTableState<String, Integer> wordCounts = new MutableTableState<String, Integer>();
private String word = "";
private Integer count = 0;
PerformanceCounter perf = new PerformanceCounter("CounterEmitCounter" + this.name, 1000, 1000,
"");
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Integer>());
......@@ -47,11 +48,6 @@ public class WordCountCounter extends UserTaskInvokable {
outRecord.setInteger(1, count);
emit(outRecord);
}
@Override
public String getResult() {
return "";
perf.count();
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.index;
/**
* An easy open-sourced implementation of B-tree.
* Currently this implementation does not support duplicated key insert.
* This file would be reimplemented and optimized for in-memory state.
* Source code website: http://algs4.cs.princeton.edu/62btrees/BTree.java
*/
public class BTreeIndex<Key extends Comparable<Key>, Value> {
private static final int M = 4; // max children per B-tree node = M-1
private Node root; // root of the B-tree
private int HT; // height of the B-tree
private int N; // number of key-value pairs in the B-tree
// helper B-tree node data type
private static final class Node {
private int m; // number of children
private Entry[] children = new Entry[M]; // the array of children
private Node(int k) { m = k; } // create a node with k children
}
// internal nodes: only use key and next
// external nodes: only use key and value
private static class Entry {
private Comparable key;
private Object value;
private Node next; // helper field to iterate over array entries
public Entry(Comparable key, Object value, Node next) {
this.key = key;
this.value = value;
this.next = next;
}
}
// constructor
public BTreeIndex() { root = new Node(0); }
// return number of key-value pairs in the B-tree
public int size() { return N; }
// return height of B-tree
public int height() { return HT; }
// search for given key, return associated value; return null if no such key
public Value get(Key key) { return search(root, key, HT); }
private Value search(Node x, Key key, int ht) {
Entry[] children = x.children;
// external node
if (ht == 0) {
for (int j = 0; j < x.m; j++) {
if (eq(key, children[j].key)) return (Value) children[j].value;
}
}
// internal node
else {
for (int j = 0; j < x.m; j++) {
if (j+1 == x.m || less(key, children[j+1].key))
return search(children[j].next, key, ht-1);
}
}
return null;
}
// insert key-value pair
// add code to check for duplicate keys
public void put(Key key, Value value) {
Node u = insert(root, key, value, HT);
N++;
if (u == null) return;
// need to split root
Node t = new Node(2);
t.children[0] = new Entry(root.children[0].key, null, root);
t.children[1] = new Entry(u.children[0].key, null, u);
root = t;
HT++;
}
private Node insert(Node h, Key key, Value value, int ht) {
int j;
Entry t = new Entry(key, value, null);
// external node
if (ht == 0) {
for (j = 0; j < h.m; j++) {
if (less(key, h.children[j].key)) break;
}
}
// internal node
else {
for (j = 0; j < h.m; j++) {
if ((j+1 == h.m) || less(key, h.children[j+1].key)) {
Node u = insert(h.children[j++].next, key, value, ht-1);
if (u == null) return null;
t.key = u.children[0].key;
t.next = u;
break;
}
}
}
for (int i = h.m; i > j; i--) h.children[i] = h.children[i-1];
h.children[j] = t;
h.m++;
if (h.m < M) return null;
else return split(h);
}
// split node in half
private Node split(Node h) {
Node t = new Node(M/2);
h.m = M/2;
for (int j = 0; j < M/2; j++)
t.children[j] = h.children[M/2+j];
return t;
}
// for debugging
public String toString() {
return toString(root, HT, "") + "\n";
}
private String toString(Node h, int ht, String indent) {
String s = "";
Entry[] children = h.children;
if (ht == 0) {
for (int j = 0; j < h.m; j++) {
s += indent + children[j].key + " " + children[j].value + "\n";
}
}
else {
for (int j = 0; j < h.m; j++) {
if (j > 0) s += indent + "(" + children[j].key + ")\n";
s += toString(children[j].next, ht-1, indent + " ");
}
}
return s;
}
// comparison functions - make Comparable instead of Key to avoid casts
private boolean less(Comparable k1, Comparable k2) {
return k1.compareTo(k2) < 0;
}
private boolean eq(Comparable k1, Comparable k2) {
return k1.compareTo(k2) == 0;
}
}
\ No newline at end of file
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.index;
public class IndexPair{
public IndexPair(int block, int entry){
blockId=block;
entryId=entry;
}
public IndexPair(IndexPair pair){
blockId=pair.blockId;
entryId=pair.entryId;
}
public void setIndexPair(int block, int entry){
blockId=block;
entryId=entry;
}
public void IncrementBlock(){
blockId=blockId+1;
entryId=0;
}
public int blockId;
public int entryId;
}
......@@ -15,6 +15,78 @@
package eu.stratosphere.streaming.state;
public class LogTableState {
import java.util.ArrayList;
import java.util.HashMap;
import eu.stratosphere.streaming.index.IndexPair;
/**
* The log-structured key value store thats accept any modification operation by
* appending the value to the end of the state.
*/
public class LogTableState<K, V> implements TableState<K, V> {
private HashMap<K, IndexPair> hashMap = new HashMap<K, IndexPair>();
private HashMap<Integer, ArrayList<V>> blockList = new HashMap<Integer, ArrayList<V>>();
private final int perBlockEntryCount = 1000;
private IndexPair nextInsertPos = new IndexPair(-1, -1);
public LogTableState() {
blockList.put(0, new ArrayList<V>());
nextInsertPos.setIndexPair(0, 0);
}
@Override
public void put(K key, V value) {
// TODO Auto-generated method stub
if (nextInsertPos.entryId == perBlockEntryCount) {
blockList.put(nextInsertPos.blockId + 1, new ArrayList<V>());
nextInsertPos.IncrementBlock();
}
blockList.get(nextInsertPos.blockId).add(value);
hashMap.put(key, new IndexPair(nextInsertPos));
nextInsertPos.entryId += 1;
}
@Override
public V get(K key) {
// TODO Auto-generated method stub
IndexPair index = hashMap.get(key);
if (index == null) {
return null;
} else {
return blockList.get(index.blockId).get(index.entryId);
}
}
@Override
public void delete(K key) {
// TODO Auto-generated method stub
hashMap.remove(key);
}
@Override
public boolean containsKey(K key) {
// TODO Auto-generated method stub
return hashMap.containsKey(key);
}
@Override
public String serialize() {
// TODO Auto-generated method stub
return null;
}
@Override
public void deserialize(String str) {
// TODO Auto-generated method stub
}
@Override
public TableStateIterator<K, V> getIterator() {
// TODO Auto-generated method stub
return new LogTableStateIterator<K, V>(hashMap.entrySet().iterator(), blockList);
}
}
......@@ -15,6 +15,32 @@
package eu.stratosphere.streaming.state;
public class LogTableStateIterator {
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map.Entry;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.index.IndexPair;
public class LogTableStateIterator<K, V> implements TableStateIterator<K ,V>{
private Iterator<Entry<K, IndexPair>> iterator;
private HashMap<Integer, ArrayList<V>> blockList;
public LogTableStateIterator(Iterator<Entry<K, IndexPair>> iter, HashMap<Integer, ArrayList<V>> blocks){
iterator=iter;
blockList=blocks;
}
@Override
public boolean hasNext() {
// TODO Auto-generated method stub
return false;
}
@Override
public Tuple2<K, V> next() {
// TODO Auto-generated method stub
return null;
}
}
......@@ -15,9 +15,12 @@
package eu.stratosphere.streaming.state;
import java.util.HashMap;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.index.IndexPair;
/**
* The window state for window operator. To be general enough, this class
......@@ -30,42 +33,46 @@ public class WindowState<K> {
private int slidingStep;
private int computeGranularity;
private int windowFieldId;
private int initTimestamp;
private int nextTimestamp;
private int currentRecordNum;
private int fullRecordNum;
private int slideRecordNum;
private int currentRecordCount;
private int fullRecordCount;
private int slideRecordCount;
HashMap<K, IndexPair> windowIndex;
CircularFifoBuffer buffer;
StreamRecord tempRecord;
public WindowState(int windowSize, int slidingStep, int computeGranularity, int windowFieldId) {
public WindowState(int windowSize, int slidingStep, int computeGranularity,
int windowFieldId) {
this.windowSize = windowSize;
this.slidingStep = slidingStep;
this.computeGranularity = computeGranularity;
this.windowFieldId = windowFieldId;
this.initTimestamp = -1;
this.nextTimestamp = -1;
this.currentRecordNum = 0;
//here we assume that windowSize and slidingStep is divisible by computeGranularity.
this.fullRecordNum = windowSize / computeGranularity;
this.slideRecordNum = slidingStep / computeGranularity;
this.buffer = new CircularFifoBuffer(fullRecordNum);
this.currentRecordCount = 0;
// here we assume that windowSize and slidingStep is divisible by
// computeGranularity.
this.fullRecordCount = windowSize / computeGranularity;
this.slideRecordCount = slidingStep / computeGranularity;
this.windowIndex = new HashMap<K, IndexPair>();
this.buffer = new CircularFifoBuffer(fullRecordCount);
}
public void pushBack(StreamRecord record) {
if (initTimestamp == -1){
if (initTimestamp == -1) {
initTimestamp = record.getTuple(0).getField(windowFieldId);
nextTimestamp = initTimestamp + computeGranularity;
tempRecord = new StreamRecord(record.getNumOfFields());
}
for (int i = 0; i < record.getNumOfTuples(); ++i) {
while((Integer) record.getTuple(i).getField(windowFieldId) > nextTimestamp){
while ((Integer) record.getTuple(i).getField(windowFieldId) > nextTimestamp) {
buffer.add(tempRecord);
currentRecordNum += 1;
currentRecordCount += 1;
tempRecord = new StreamRecord(record.getNumOfFields());
}
tempRecord.addShadowTuple(record.getTuple(i));
......@@ -79,12 +86,12 @@ public class WindowState<K> {
}
public boolean isFull() {
return currentRecordNum >= fullRecordNum;
return currentRecordCount >= fullRecordCount;
}
public boolean isComputable() {
if (currentRecordNum == fullRecordNum + slideRecordNum) {
currentRecordNum -= slideRecordNum;
if (currentRecordCount == fullRecordCount + slideRecordCount) {
currentRecordCount -= slideRecordCount;
return true;
}
return false;
......
......@@ -13,41 +13,23 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.wordcount;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.util.PerformanceCounter;
public class WordCountKvCounter extends UserTaskInvokable {
private MutableTableState<String, Integer> wordCounts = new MutableTableState<String, Integer>();
private String word = "";
private Integer count = 0;
PerformanceCounter perf = new PerformanceCounter("CounterEmitCounter" + this.name, 1000, 1000,
"");
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Integer>());
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
} else {
count = 1;
wordCounts.put(word, 1);
}
outRecord.setString(0, word);
outRecord.setInteger(1, count);
emit(outRecord);
perf.count();
package eu.stratosphere.streaming.api.index;
import org.junit.Test;
import eu.stratosphere.streaming.index.BTreeIndex;
import eu.stratosphere.streaming.index.IndexPair;
public class BTreeIndexTest {
@Test
public void bTreeIndexOperationTest(){
BTreeIndex<String, IndexPair> btree=new BTreeIndex<String, IndexPair>();
btree.put("abc", new IndexPair(7, 3));
btree.put("abc", new IndexPair(1, 2));
btree.put("def", new IndexPair(6, 3));
btree.put("ghi", new IndexPair(3, 6));
btree.put("jkl", new IndexPair(4, 7));
System.out.println(btree.get("abc").blockId+", "+btree.get("abc").entryId);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.api.state;
import org.junit.Test;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.state.LogTableState;
import eu.stratosphere.streaming.state.MutableTableState;
import eu.stratosphere.streaming.state.TableStateIterator;
import eu.stratosphere.streaming.state.WindowState;
public class InternalStateTest {
@Test
public void MutableTableStateTest(){
MutableTableState<String, String> state=new MutableTableState<String, String>();
state.put("abc", "hello");
state.put("test", "world");
state.put("state", "mutable");
state.put("streaming", "persist");
String s=state.get("streaming");
if(s==null){
System.out.println("key does not exist!");
}
else{
System.out.println("value="+s);
}
s=state.get("null");
if(s==null){
System.out.println("key does not exist!");
}
else{
System.out.println("value="+s);
}
TableStateIterator<String, String> iterator=state.getIterator();
while(iterator.hasNext()){
Tuple2<String, String> tuple=iterator.next();
System.out.println(tuple.getField(0)+", "+tuple.getField(1));
}
}
@Test
public void LogTableStateTest(){
LogTableState<String, String> state=new LogTableState<String, String>();
state.put("abc", "hello");
state.put("test", "world");
state.put("state", "mutable");
state.put("streaming", "persist");
String s=state.get("streaming");
if(s==null){
System.out.println("key does not exist!");
}
else{
System.out.println("value="+s);
}
s=state.get("null");
if(s==null){
System.out.println("key does not exist!");
}
else{
System.out.println("value="+s);
}
TableStateIterator<String, String> iterator=state.getIterator();
while(iterator.hasNext()){
Tuple2<String, String> tuple=iterator.next();
System.out.println(tuple.getField(0)+", "+tuple.getField(1));
}
}
@Test
public void WindowStateTest(){
WindowState<String> state=new WindowState<String>(100, 20, 10, 2);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册