提交 fc149066 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] performance util refactor

上级 a2caec7e
......@@ -35,14 +35,14 @@ import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.streaming.util.PerformanceTracker;
import eu.stratosphere.streaming.util.PerformanceCounter;
public class WordCountRemote {
private final static int recordsEmitted = 100000;
public static class WordCountDebugSource extends UserSourceInvokable {
private PerformanceTracker perf = new PerformanceTracker("SourceEmitCounter",1000, 10000);
private PerformanceCounter perf = new PerformanceCounter("SourceEmitCounter", 1000, 10000);
StreamRecord record = new StreamRecord(new Tuple1<String>());
......@@ -70,7 +70,7 @@ public class WordCountRemote {
public static class WordCountDebugSplitter extends UserTaskInvokable {
private PerformanceTracker perf = new PerformanceTracker("SplitterEmitCounter",1000, 10000);
private PerformanceCounter perf = new PerformanceCounter("SplitterEmitCounter", 1000, 10000);
private String[] words = new String[] {};
private StreamRecord outputRecord = new StreamRecord(new Tuple1<String>());
......@@ -93,7 +93,7 @@ public class WordCountRemote {
}
public static class WordCountDebugCounter extends UserTaskInvokable {
private PerformanceTracker perf = new PerformanceTracker("CounterEmitCounter",1000, 10000);
private PerformanceCounter perf = new PerformanceCounter("CounterEmitCounter", 1000, 10000);
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private String word = "";
......@@ -127,7 +127,7 @@ public class WordCountRemote {
}
public static class WordCountDebugSink extends UserSinkInvokable {
private PerformanceTracker perf = new PerformanceTracker("SinkEmitCounter",1000, 10000);
private PerformanceCounter perf = new PerformanceCounter("SinkEmitCounter", 1000, 10000);
@Override
public void invoke(StreamRecord record) throws Exception {
......@@ -145,7 +145,7 @@ public class WordCountRemote {
graphBuilder.setSource("WordCountSource", WordCountDebugSource.class, 2, 1);
graphBuilder.setTask("WordCountSplitter", WordCountDebugSplitter.class, 2, 1);
graphBuilder.setTask("WordCountCounter", WordCountDebugCounter.class, 2, 1);
graphBuilder.setSink("WordCountSink", WordCountDebugSink.class,2,1);
graphBuilder.setSink("WordCountSink", WordCountDebugSink.class, 2, 1);
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0);
......
package eu.stratosphere.streaming.util;
public class PerformanceCounter extends PerformanceTracker {
public PerformanceCounter(String name, int counterLength, int countInterval) {
super(name, counterLength, countInterval);
}
public PerformanceCounter(String name) {
super(name);
}
public void count(long i, String label) {
buffer = buffer + i;
intervalCounter++;
if (intervalCounter % interval == 0) {
intervalCounter = 0;
timeStamps.add(System.currentTimeMillis());
values.add(buffer);
labels.add(label);
}
}
public void count(long i) {
count(i, "counter");
}
public void count(String label) {
count(1, label);
}
public void count() {
count(1, "counter");
}
}
package eu.stratosphere.streaming.util;
public class PerformanceTimer extends PerformanceTracker {
long timer;
boolean millis;
public PerformanceTimer(String name, int counterLength, int countInterval) {
super(name, counterLength, countInterval);
}
public PerformanceTimer(String name) {
super(name);
}
public void startTimer(boolean millis) {
this.millis = millis;
if (millis) {
timer = System.currentTimeMillis();
} else {
timer = System.nanoTime();
}
}
public void startTimer() {
startTimer(true);
}
public void stopTimer(String label) {
if (millis) {
track(System.currentTimeMillis() - timer, label);
} else {
track(System.nanoTime() - timer, label);
}
}
public void stopTimer() {
stopTimer("timer");
}
}
......@@ -7,42 +7,39 @@ import java.util.List;
public class PerformanceTracker {
List<Long> timeStamps;
List<Long> values;
List<String> labels;
long counter;
long countInterval;
long intervalCounter;
long buffer;
long timer;
boolean millis;
String name;
protected List<Long> timeStamps;
protected List<Long> values;
protected List<String> labels;
public PerformanceTracker(String name) {
protected int interval;
protected int intervalCounter;
protected String name;
protected long buffer;
public PerformanceTracker(String name) {
timeStamps = new ArrayList<Long>();
values = new ArrayList<Long>();
labels = new ArrayList<String>();
this.countInterval = 1;
counter = 0;
this.interval = 1;
this.name = name;
buffer = 0;
}
public PerformanceTracker(String name, int counterLength, int countInterval) {
timeStamps = new ArrayList<Long>(counterLength);
values = new ArrayList<Long>(counterLength);
labels = new ArrayList<String>(counterLength);
this.countInterval = countInterval;
counter = 0;
public PerformanceTracker(String name, int capacity, int interval) {
timeStamps = new ArrayList<Long>(capacity);
values = new ArrayList<Long>(capacity);
labels = new ArrayList<String>(capacity);
this.interval = interval;
this.name = name;
buffer = 0;
}
public void track(Long value, String label) {
buffer = buffer + value;
intervalCounter++;
if (intervalCounter % countInterval == 0) {
if (intervalCounter % interval == 0) {
timeStamps.add(System.currentTimeMillis());
values.add(buffer);
......@@ -68,56 +65,6 @@ public class PerformanceTracker {
track(1);
}
public void startTimer(boolean millis) {
this.millis = millis;
if (millis) {
timer = System.currentTimeMillis();
} else {
timer = System.nanoTime();
}
}
public void startTimer() {
startTimer(true);
}
public void stopTimer(String label) {
if (millis) {
track(System.currentTimeMillis() - timer, label);
} else {
track(System.nanoTime() - timer, label);
}
}
public void stopTimer() {
stopTimer("timer");
}
public void count(long i, String label) {
counter = counter + i;
intervalCounter++;
if (intervalCounter % countInterval == 0) {
intervalCounter = 0;
timeStamps.add(System.currentTimeMillis());
values.add(counter);
labels.add(label);
}
}
public void count(long i) {
count(i, "counter");
}
public void count(String label) {
count(1, label);
}
public void count() {
count(1, "counter");
}
@Override
public String toString() {
StringBuilder csv = new StringBuilder();
......
......@@ -19,48 +19,53 @@ public class PerformanceTrackerTest {
@Test
public void testTrack() {
PerformanceTracker pT = new PerformanceTracker("testcounter");
PerformanceTracker pT = new PerformanceTracker("tracker");
pT.track();
pT.track(3);
pT.track(1);
assertEquals(2, pT.timeStamps.size());
assertEquals(2, pT.values.size());
assertEquals(3, pT.timeStamps.size());
assertEquals(3, pT.values.size());
assertEquals(Long.valueOf(1), pT.values.get(0));
assertEquals(Long.valueOf(3), pT.values.get(1));
assertEquals(Long.valueOf(1), pT.values.get(2));
PerformanceTracker pT2 = new PerformanceTracker("testcounter", 10, 2);
PerformanceTracker pT2 = new PerformanceTracker("tracker", 10, 2);
pT2.track(1);
pT2.track(3);
pT2.track(1);
pT2.track(3);
assertEquals(1, pT2.timeStamps.size());
assertEquals(1, pT2.values.size());
assertEquals(2, pT2.timeStamps.size());
assertEquals(2, pT2.values.size());
assertEquals(Long.valueOf(4), pT2.values.get(0));
assertEquals(Long.valueOf(4), pT2.values.get(1));
System.out.println(pT);
System.out.println(pT2);
System.out.println("--------------");
}
@Test
public void testCount() {
PerformanceTracker pT = new PerformanceTracker("testcounter");
pT.count();
pT.count(10);
pT.count();
PerformanceCounter pC = new PerformanceCounter("counter");
pC.count();
pC.count(10);
pC.count();
assertEquals(3, pT.timeStamps.size());
assertEquals(3, pT.values.size());
assertEquals(3, pC.timeStamps.size());
assertEquals(3, pC.values.size());
assertEquals(Long.valueOf(1), pT.values.get(0));
assertEquals(Long.valueOf(11), pT.values.get(1));
assertEquals(Long.valueOf(12), pT.values.get(2));
assertEquals(Long.valueOf(1), pC.values.get(0));
assertEquals(Long.valueOf(11), pC.values.get(1));
assertEquals(Long.valueOf(12), pC.values.get(2));
System.out.println(pT);
System.out.println(pC);
System.out.println("--------------");
PerformanceTracker pT2 = new PerformanceTracker("testcounter", 1000, 10000);
PerformanceCounter pT2 = new PerformanceCounter("counter", 1000, 10000);
for (int i = 0; i < 10000000; i++) {
pT2.count("test");
......@@ -74,7 +79,7 @@ public class PerformanceTrackerTest {
@Test
public void testTimer() throws InterruptedException {
PerformanceTracker pT = new PerformanceTracker("testcounter");
PerformanceTimer pT = new PerformanceTimer("timer");
pT.startTimer(true);
Thread.sleep(100);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册