提交 a2caec7e 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] timer added to performance tracker

上级 98967ea7
......@@ -13,14 +13,20 @@
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
package eu.stratosphere.streaming.examples.window.wordcount;
import eu.stratosphere.api.java.tuple.Tuple2;
public class IncrementalWindow {
/**
* the iterator for internal states.
*/
public interface StateIterator<K, V>{
public boolean hasNext();
public Tuple2<K, V> next();
private int currentTupleNum;
private int fullTupleNum;
private int slideTupleNum;
public IncrementalWindow(int batchRange, int windowSize, int slidingStep) {
}
void pushBack() {
}
void popFront() {
}
}
......@@ -15,35 +15,29 @@
package eu.stratosphere.streaming.examples.window.wordcount;
import java.util.HashMap;
import java.util.Map;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableInternalState;
import eu.stratosphere.streaming.state.WindowInternalState;
public class WindowWordCountCounter extends UserTaskInvokable {
private int windowSize;
private int slidingStep;
private WindowInternalState<Integer> window;
private MutableInternalState<String, Integer> wordCounts;
private int windowSize = 100;
private int slidingStep = 20;
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private String word = "";
private Integer count = 0;
private Long timestamp = 0L;
private StreamRecord outRecord = new StreamRecord(
new Tuple3<String, Integer, Long>());
private StreamRecord outRecord = new StreamRecord(new Tuple3<String, Integer, Long>());
public WindowWordCountCounter() {
windowSize = 100;
slidingStep = 20;
window = new WindowInternalState<Integer>(windowSize, slidingStep);
wordCounts = new MutableInternalState<String, Integer>();
}
private void incrementCompute(StreamRecord record) {
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
timestamp = record.getLong(1);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
......@@ -51,41 +45,12 @@ public class WindowWordCountCounter extends UserTaskInvokable {
count = 1;
wordCounts.put(word, 1);
}
}
private void decrementCompute(StreamRecord record) {
word = record.getString(0);
count = wordCounts.get(word) - 1;
if (count == 0) {
wordCounts.delete(word);
} else {
wordCounts.put(word, count);
}
}
outRecord.setString(0, word);
outRecord.setInteger(1, count);
outRecord.setLong(2, timestamp);
@Override
public void invoke(StreamRecord record) throws Exception {
if (window.isFull()) {
StreamRecord expiredRecord = window.popFront();
incrementCompute(record);
decrementCompute(expiredRecord);
window.pushBack(record);
if (window.isComputable()) {
outRecord.setString(0, word);
outRecord.setInteger(1, count);
outRecord.setLong(2, timestamp);
emit(outRecord);
}
} else {
incrementCompute(record);
window.pushBack(record);
if(window.isFull()){
outRecord.setString(0, word);
outRecord.setInteger(1, count);
outRecord.setLong(2, timestamp);
emit(outRecord);
}
}
emit(outRecord);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.examples.wordcount;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableInternalState;
public class WordCountKvCounter extends UserTaskInvokable {
private MutableInternalState<String, Integer> wordCounts=new MutableInternalState<String, Integer>();
private String word = "";
private Integer count = 0;
// private StreamRecord streamRecord = new StreamRecord(2);
private int i = 0;
private long time;
private long prevTime = System.currentTimeMillis();
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Integer>());
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
i++;
if (i % 50000 == 0) {
time = System.currentTimeMillis();
System.out.println("Counter:\t" + i + "\t----Time: "
+ (time - prevTime));
prevTime = time;
}
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
} else {
count=1;
wordCounts.put(word, 1);
}
outRecord.setString(0,word);
outRecord.setInteger(1,count);
emit(outRecord);
}
}
......@@ -42,7 +42,7 @@ public class WordCountRemote {
public static class WordCountDebugSource extends UserSourceInvokable {
private PerformanceTracker perf = new PerformanceTracker(1000, 10000);
private PerformanceTracker perf = new PerformanceTracker("SourceEmitCounter",1000, 10000);
StreamRecord record = new StreamRecord(new Tuple1<String>());
......@@ -64,13 +64,13 @@ public class WordCountRemote {
@Override
public String getResult() {
return perf.createCSV();
return perf.toString();
}
}
public static class WordCountDebugSplitter extends UserTaskInvokable {
private PerformanceTracker perf = new PerformanceTracker(1000, 10000);
private PerformanceTracker perf = new PerformanceTracker("SplitterEmitCounter",1000, 10000);
private String[] words = new String[] {};
private StreamRecord outputRecord = new StreamRecord(new Tuple1<String>());
......@@ -88,12 +88,12 @@ public class WordCountRemote {
@Override
public String getResult() {
return perf.createCSV();
return perf.toString();
}
}
public static class WordCountDebugCounter extends UserTaskInvokable {
private PerformanceTracker perf = new PerformanceTracker(1000, 10000);
private PerformanceTracker perf = new PerformanceTracker("CounterEmitCounter",1000, 10000);
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private String word = "";
......@@ -122,12 +122,12 @@ public class WordCountRemote {
@Override
public String getResult() {
return perf.createCSV();
return perf.toString();
}
}
public static class WordCountDebugSink extends UserSinkInvokable {
private PerformanceTracker perf = new PerformanceTracker(1000, 10000);
private PerformanceTracker perf = new PerformanceTracker("SinkEmitCounter",1000, 10000);
@Override
public void invoke(StreamRecord record) throws Exception {
......@@ -136,7 +136,7 @@ public class WordCountRemote {
@Override
public String getResult() {
return perf.createCSV();
return perf.toString();
}
}
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
/**
* An internal state interface that supports stateful operator.
*/
public interface InternalState<K, V> {
public void put(K key, V value);
public V get(K key);
public void delete(K key);
public boolean containsKey(K key);
public StateIterator<K, V> getIterator();
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* The most general internal state that stores data in a mutable map.
*/
public class MutableInternalState<K, V> implements InternalState<K, V> {
private Map<K, V> state=new LinkedHashMap<K, V>();
@Override
public void put(K key, V value) {
// TODO Auto-generated method stub
state.put(key, value);
}
@Override
public V get(K key) {
// TODO Auto-generated method stub
return state.get(key);
}
@Override
public void delete(K key) {
// TODO Auto-generated method stub
state.remove(key);
}
@Override
public boolean containsKey(K key) {
// TODO Auto-generated method stub
return state.containsKey(key);
}
@Override
public StateIterator<K, V> getIterator() {
// TODO Auto-generated method stub
return new MutableStateIterator<K, V>(state.entrySet().iterator());
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import java.util.Iterator;
import java.util.Map.Entry;
import eu.stratosphere.api.java.tuple.Tuple2;
public class MutableStateIterator<K, V> implements StateIterator<K, V>{
private Iterator<Entry<K, V>> iterator;
public MutableStateIterator(Iterator<Entry<K, V>> iter){
iterator=iter;
}
@Override
public boolean hasNext() {
// TODO Auto-generated method stub
return iterator.hasNext();
}
@Override
public Tuple2<K, V> next() {
// TODO Auto-generated method stub
Entry<K, V> entry=iterator.next();
return new Tuple2<K, V>(entry.getKey(), entry.getValue());
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.streaming.state;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
/**
* The window state for window operator. To be general enough, this class
* implements a count based window operator. It is possible for the user to
* compose time based window operator by extending this class by splitting the
* stream into multiple mini batches.
*/
public class WindowInternalState<K> implements InternalState<K, StreamRecord> {
private int currentRecordNum;
private int fullRecordNum;
private int slideRecordNum;
CircularFifoBuffer buffer;
public WindowInternalState(int windowSize, int slidingStep) {
currentRecordNum = 0;
fullRecordNum = windowSize;
slideRecordNum = slidingStep;
buffer = new CircularFifoBuffer(windowSize);
}
public void pushBack(StreamRecord records) {
buffer.add(records);
currentRecordNum += 1;
}
public StreamRecord popFront() {
StreamRecord frontRecord=(StreamRecord) buffer.get();
buffer.remove();
return frontRecord;
}
public boolean isFull() {
return currentRecordNum >= fullRecordNum;
}
public boolean isComputable() {
if (currentRecordNum == fullRecordNum + slideRecordNum) {
currentRecordNum -= slideRecordNum;
return true;
}
return false;
}
@Override
public void put(K key, StreamRecord value) {
// TODO Auto-generated method stub
}
@Override
public StreamRecord get(K key) {
// TODO Auto-generated method stub
return null;
}
@Override
public void delete(K key) {
// TODO Auto-generated method stub
}
@Override
public boolean containsKey(K key) {
// TODO Auto-generated method stub
return false;
}
@Override
public StateIterator<K, StreamRecord> getIterator() {
// TODO Auto-generated method stub
return null;
}
}
package eu.stratosphere.streaming.state;
public class WindowStateIterator {
}
......@@ -12,32 +12,48 @@ public class PerformanceTracker {
List<String> labels;
long counter;
long countInterval;
long counts;
long intervalCounter;
long buffer;
long timer;
boolean millis;
String name;
public PerformanceTracker(String name) {
public PerformanceTracker() {
timeStamps = new ArrayList<Long>();
values = new ArrayList<Long>();
labels = new ArrayList<String>();
this.countInterval = 1;
counter = 0;
this.name = name;
buffer = 0;
}
public PerformanceTracker(int counterLength, int countInterval) {
public PerformanceTracker(String name, int counterLength, int countInterval) {
timeStamps = new ArrayList<Long>(counterLength);
values = new ArrayList<Long>(counterLength);
labels = new ArrayList<String>(counterLength);
this.countInterval = countInterval;
counter = 0;
this.name = name;
}
public void track(Long value, String label) {
timeStamps.add(System.currentTimeMillis());
values.add(value);
labels.add(label);
buffer = buffer + value;
intervalCounter++;
if (intervalCounter % countInterval == 0) {
timeStamps.add(System.currentTimeMillis());
values.add(buffer);
labels.add(label);
buffer = 0;
intervalCounter = 0;
}
}
public void track(Long value) {
track(value, "");
track(value, "tracker");
}
public void track(int value, String label) {
......@@ -45,18 +61,45 @@ public class PerformanceTracker {
}
public void track(int value) {
track(Long.valueOf(value), "");
track(Long.valueOf(value), "tracker");
}
public void track() {
track(1);
}
public void startTimer(boolean millis) {
this.millis = millis;
if (millis) {
timer = System.currentTimeMillis();
} else {
timer = System.nanoTime();
}
}
public void startTimer() {
startTimer(true);
}
public void stopTimer(String label) {
if (millis) {
track(System.currentTimeMillis() - timer, label);
} else {
track(System.nanoTime() - timer, label);
}
}
public void stopTimer() {
stopTimer("timer");
}
public void count(long i, String label) {
counter = counter + i;
counts++;
if (counts % countInterval == 0) {
counts = 0;
intervalCounter++;
if (intervalCounter % countInterval == 0) {
intervalCounter = 0;
timeStamps.add(System.currentTimeMillis());
values.add(counter);
labels.add(label);
......@@ -64,7 +107,7 @@ public class PerformanceTracker {
}
public void count(long i) {
count(i, "");
count(i, "counter");
}
public void count(String label) {
......@@ -72,13 +115,14 @@ public class PerformanceTracker {
}
public void count() {
count(1, "");
count(1, "counter");
}
public String createCSV() {
@Override
public String toString() {
StringBuilder csv = new StringBuilder();
csv.append("Time,Value,Label\n");
csv.append("Time," + name + ",Label\n");
for (int i = 0; i < timeStamps.size(); i++) {
csv.append(timeStamps.get(i) + "," + values.get(i) + "," + labels.get(i) + "\n");
......@@ -91,7 +135,7 @@ public class PerformanceTracker {
try {
PrintWriter out = new PrintWriter(file);
out.print(createCSV());
out.print(toString());
out.close();
} catch (FileNotFoundException e) {
......
......@@ -19,7 +19,7 @@ public class PerformanceTrackerTest {
@Test
public void testTrack() {
PerformanceTracker pT = new PerformanceTracker();
PerformanceTracker pT = new PerformanceTracker("testcounter");
pT.track();
pT.track(3);
......@@ -29,14 +29,23 @@ public class PerformanceTrackerTest {
assertEquals(Long.valueOf(1), pT.values.get(0));
assertEquals(Long.valueOf(3), pT.values.get(1));
System.out.println(pT.createCSV());
PerformanceTracker pT2 = new PerformanceTracker("testcounter", 10, 2);
pT2.track(1);
pT2.track(3);
assertEquals(1, pT2.timeStamps.size());
assertEquals(1, pT2.values.size());
assertEquals(Long.valueOf(4), pT2.values.get(0));
System.out.println(pT);
System.out.println("--------------");
}
@Test
public void testCount() {
PerformanceTracker pT = new PerformanceTracker();
PerformanceTracker pT = new PerformanceTracker("testcounter");
pT.count();
pT.count(10);
pT.count();
......@@ -48,19 +57,34 @@ public class PerformanceTrackerTest {
assertEquals(Long.valueOf(11), pT.values.get(1));
assertEquals(Long.valueOf(12), pT.values.get(2));
System.out.println(pT.createCSV());
System.out.println(pT);
System.out.println("--------------");
PerformanceTracker pT2 = new PerformanceTracker(1000,10000);
for(int i=0;i<10000000;i++){
PerformanceTracker pT2 = new PerformanceTracker("testcounter", 1000, 10000);
for (int i = 0; i < 10000000; i++) {
pT2.count("test");
}
assertEquals(1000, pT2.timeStamps.size());
//pT2.writeCSV("C:/temp/test.csv");
// pT2.writeCSV("C:/temp/test.csv");
}
@Test
public void testTimer() throws InterruptedException {
PerformanceTracker pT = new PerformanceTracker("testcounter");
pT.startTimer(true);
Thread.sleep(100);
pT.stopTimer();
assertEquals(1, pT.timeStamps.size());
assertEquals(1, pT.values.size());
assertTrue(pT.values.get(0) < 105);
System.out.println(pT);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册