提交 6f8f384b 编写于 作者: Y Yingjun Wu 提交者: Stephan Ewen

[streaming] initiate internal state for stateful operators

上级 e547f4f8
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.wordcount;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableInternalState;
public class WordCountKvCounter extends UserTaskInvokable {
private MutableInternalState<String, Integer> wordCounts=new MutableInternalState<String, Integer>();
private String word = "";
private Integer count = 0;
// private StreamRecord streamRecord = new StreamRecord(2);
private int i = 0;
private long time;
private long prevTime = System.currentTimeMillis();
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Integer>());
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
i++;
if (i % 50000 == 0) {
time = System.currentTimeMillis();
System.out.println("Counter:\t" + i + "\t----Time: "
+ (time - prevTime));
prevTime = time;
}
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
} else {
count=1;
wordCounts.put(word, 1);
}
outRecord.setString(0,word);
outRecord.setInteger(1,count);
emit(outRecord);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
/**
* An internal state interface that supports stateful operator.
*/
public interface InternalState<K, V> {
public void put(K key, V value);
public V get(K key);
public void delete(K key);
public boolean containsKey(K key);
public StateIterator<K, V> getIterator();
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* The most general internal state that stores data in a mutable map.
*/
public class MutableInternalState<K, V> implements InternalState<K, V> {
private Map<K, V> state=new LinkedHashMap<K, V>();
@Override
public void put(K key, V value) {
// TODO Auto-generated method stub
state.put(key, value);
}
@Override
public V get(K key) {
// TODO Auto-generated method stub
return state.get(key);
}
@Override
public void delete(K key) {
// TODO Auto-generated method stub
state.remove(key);
}
@Override
public boolean containsKey(K key) {
// TODO Auto-generated method stub
return state.containsKey(key);
}
@Override
public StateIterator<K, V> getIterator() {
// TODO Auto-generated method stub
return new MutableStateIterator<K, V>(state.entrySet().iterator());
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import java.util.Iterator;
import java.util.Map.Entry;
import eu.stratosphere.api.java.tuple.Tuple2;
public class MutableStateIterator<K, V> implements StateIterator<K, V>{
private Iterator<Entry<K, V>> iterator;
public MutableStateIterator(Iterator<Entry<K, V>> iter){
iterator=iter;
}
@Override
public boolean hasNext() {
// TODO Auto-generated method stub
return iterator.hasNext();
}
@Override
public Tuple2<K, V> next() {
// TODO Auto-generated method stub
Entry<K, V> entry=iterator.next();
return new Tuple2<K, V>(entry.getKey(), entry.getValue());
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import eu.stratosphere.api.java.tuple.Tuple2;
/**
* the iterator for internal states.
*/
public interface StateIterator<K, V>{
public boolean hasNext();
public Tuple2<K, V> next();
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册