提交 81fd2cfe 编写于 作者: Y Yingjun Wu 提交者: Stephan Ewen

[streaming] add licence

上级 fc149066
......@@ -15,29 +15,35 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableInternalState;
import eu.stratosphere.streaming.state.WindowInternalState;
public class WindowWordCountCounter extends UserTaskInvokable {
private int windowSize = 100;
private int slidingStep = 20;
private int windowSize;
private int slidingStep;
private WindowInternalState<Integer> window;
private MutableInternalState<String, Integer> wordCounts;
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
private StreamRecord outRecord = new StreamRecord(new Tuple3<String, Integer, Long>());
private StreamRecord outRecord = new StreamRecord(
new Tuple3<String, Integer, Long>());
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
timestamp = record.getLong(1);
public WindowWordCountCounter() {
windowSize = 100;
slidingStep = 20;
window = new WindowInternalState<Integer>(windowSize, slidingStep);
wordCounts = new MutableInternalState<String, Integer>();
}
private void incrementCompute(StreamRecord record) {
word = record.getString(0);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
......@@ -45,12 +51,41 @@ public class WindowWordCountCounter extends UserTaskInvokable {
count = 1;
wordCounts.put(word, 1);
}
}
outRecord.setString(0, word);
outRecord.setInteger(1, count);
outRecord.setLong(2, timestamp);
private void decrementCompute(StreamRecord record) {
word = record.getString(0);
count = wordCounts.get(word) - 1;
if (count == 0) {
wordCounts.delete(word);
} else {
wordCounts.put(word, count);
}
}
emit(outRecord);
@Override
public void invoke(StreamRecord record) throws Exception {
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(record);
decrementCompute(expiredRecord);
window.pushBack(record);
if (window.isComputable()) {
outRecord.setString(0, word);
outRecord.setInteger(1, count);
outRecord.setLong(2, timestamp);
emit(outRecord);
}
} else {
incrementCompute(record);
window.pushBack(record);
if(window.isFull()){
outRecord.setString(0, word);
outRecord.setInteger(1, count);
outRecord.setLong(2, timestamp);
emit(outRecord);
}
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.wordcount;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableInternalState;
public class WordCountKvCounter extends UserTaskInvokable {
private MutableInternalState<String, Integer> wordCounts=new MutableInternalState<String, Integer>();
private String word = "";
private Integer count = 0;
// private StreamRecord streamRecord = new StreamRecord(2);
private int i = 0;
private long time;
private long prevTime = System.currentTimeMillis();
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Integer>());
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
i++;
if (i % 50000 == 0) {
time = System.currentTimeMillis();
System.out.println("Counter:\t" + i + "\t----Time: "
+ (time - prevTime));
prevTime = time;
}
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
} else {
count=1;
wordCounts.put(word, 1);
}
outRecord.setString(0,word);
outRecord.setInteger(1,count);
emit(outRecord);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
/**
* An internal state interface that supports stateful operator.
*/
public interface InternalState<K, V> {
public void put(K key, V value);
public V get(K key);
public void delete(K key);
public boolean containsKey(K key);
public StateIterator<K, V> getIterator();
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* The most general internal state that stores data in a mutable map.
*/
public class MutableInternalState<K, V> implements InternalState<K, V> {
private Map<K, V> state=new LinkedHashMap<K, V>();
@Override
public void put(K key, V value) {
// TODO Auto-generated method stub
state.put(key, value);
}
@Override
public V get(K key) {
// TODO Auto-generated method stub
return state.get(key);
}
@Override
public void delete(K key) {
// TODO Auto-generated method stub
state.remove(key);
}
@Override
public boolean containsKey(K key) {
// TODO Auto-generated method stub
return state.containsKey(key);
}
@Override
public StateIterator<K, V> getIterator() {
// TODO Auto-generated method stub
return new MutableStateIterator<K, V>(state.entrySet().iterator());
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import java.util.Iterator;
import java.util.Map.Entry;
import eu.stratosphere.api.java.tuple.Tuple2;
public class MutableStateIterator<K, V> implements StateIterator<K, V>{
private Iterator<Entry<K, V>> iterator;
public MutableStateIterator(Iterator<Entry<K, V>> iter){
iterator=iter;
}
@Override
public boolean hasNext() {
// TODO Auto-generated method stub
return iterator.hasNext();
}
@Override
public Tuple2<K, V> next() {
// TODO Auto-generated method stub
Entry<K, V> entry=iterator.next();
return new Tuple2<K, V>(entry.getKey(), entry.getValue());
}
}
......@@ -13,20 +13,14 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.window.wordcount;
package eu.stratosphere.streaming.state;
public class IncrementalWindow {
import eu.stratosphere.api.java.tuple.Tuple2;
private int currentTupleNum;
private int fullTupleNum;
private int slideTupleNum;
public IncrementalWindow(int batchRange, int windowSize, int slidingStep) {
}
void pushBack() {
}
void popFront() {
}
/**
* the iterator for internal states.
*/
public interface StateIterator<K, V>{
public boolean hasNext();
public Tuple2<K, V> next();
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
/**
* The window state for window operator. To be general enough, this class
* implements a count based window operator. It is possible for the user to
* compose time based window operator by extending this class by splitting the
* stream into multiple mini batches.
*/
public class WindowInternalState<K> implements InternalState<K, StreamRecord> {
private int currentRecordNum;
private int fullRecordNum;
private int slideRecordNum;
CircularFifoBuffer buffer;
public WindowInternalState(int windowSize, int slidingStep) {
currentRecordNum = 0;
fullRecordNum = windowSize;
slideRecordNum = slidingStep;
buffer = new CircularFifoBuffer(windowSize);
}
public void pushBack(StreamRecord records) {
buffer.add(records);
currentRecordNum += 1;
}
public StreamRecord popFront() {
StreamRecord frontRecord=(StreamRecord) buffer.get();
buffer.remove();
return frontRecord;
}
public boolean isFull() {
return currentRecordNum >= fullRecordNum;
}
public boolean isComputable() {
if (currentRecordNum == fullRecordNum + slideRecordNum) {
currentRecordNum -= slideRecordNum;
return true;
}
return false;
}
@Override
public void put(K key, StreamRecord value) {
// TODO Auto-generated method stub
}
@Override
public StreamRecord get(K key) {
// TODO Auto-generated method stub
return null;
}
@Override
public void delete(K key) {
// TODO Auto-generated method stub
}
@Override
public boolean containsKey(K key) {
// TODO Auto-generated method stub
return false;
}
@Override
public StateIterator<K, StreamRecord> getIterator() {
// TODO Auto-generated method stub
return null;
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
public class WindowStateIterator<K> implements StateIterator<K, StreamRecord>{
@Override
public boolean hasNext() {
// TODO Auto-generated method stub
return false;
}
@Override
public Tuple2<K, StreamRecord> next() {
// TODO Auto-generated method stub
return null;
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.util;
public class PerformanceCounter extends PerformanceTracker {
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.util;
public class PerformanceTimer extends PerformanceTracker {
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.util;
import java.io.FileNotFoundException;
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.util;
import static org.junit.Assert.*;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册