提交 fdb55b54 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] performance tracker update and counter added to wordcount example

上级 9b46f380
......@@ -21,6 +21,7 @@ import java.util.Map;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.PerformanceCounter;
public class WordCountCounter extends UserTaskInvokable {
......@@ -28,23 +29,15 @@ public class WordCountCounter extends UserTaskInvokable {
private String word = "";
private Integer count = 0;
// private StreamRecord streamRecord = new StreamRecord(2);
private int i = 0;
private long time;
private long prevTime = System.currentTimeMillis();
PerformanceCounter perf = new PerformanceCounter("CounterEmitCounter", 1000, 10000);
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Integer>());
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
i++;
if (i % 50000 == 0) {
time = System.currentTimeMillis();
System.out.println("Counter:\t" + i + "\t----Time: "
+ (time - prevTime));
prevTime = time;
}
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
......@@ -57,5 +50,12 @@ public class WordCountCounter extends UserTaskInvokable {
outRecord.setInteger(1,count);
emit(outRecord);
perf.count();
}
@Override
public String getResult(){
perf.writeCSV("C:/temp/strato/Counter"+channelID+".csv");
return "";
}
}
......@@ -18,12 +18,12 @@ package eu.stratosphere.streaming.examples.wordcount;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.PerformanceCounter;
public class WordCountDummySource2 extends UserSourceInvokable {
StreamRecord record = new StreamRecord(new Tuple1<String>());
private long time;
private long prevTime = System.currentTimeMillis();
PerformanceCounter perf = new PerformanceCounter("SourceEmitCounter", 1000, 10000);
public WordCountDummySource2() {
}
......@@ -31,19 +31,21 @@ public class WordCountDummySource2 extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
for (int i = 0; i < 100000; i++) {
if (i % 50000 == 0) {
time = System.currentTimeMillis();
System.out.println("Source:\t\t" + i + "\t\tTime: " + (time - prevTime));
prevTime = time;
}
for (int i = 0; i < 1000000; i++) {
if (i % 2 == 0) {
record.setString(0, "Gyula Marci");
record.setString(0, "Gyula Bela");
} else {
record.setString(0, "Gabor Frank");
}
emit(record);
perf.count();
}
}
@Override
public String getResult() {
perf.writeCSV("C:/temp/strato/Source"+channelID+".csv");
return "";
}
}
......@@ -19,6 +19,7 @@ import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.state.MutableInternalState;
import eu.stratosphere.streaming.util.PerformanceCounter;
public class WordCountKvCounter extends UserTaskInvokable {
......@@ -26,23 +27,15 @@ public class WordCountKvCounter extends UserTaskInvokable {
private String word = "";
private Integer count = 0;
// private StreamRecord streamRecord = new StreamRecord(2);
private int i = 0;
private long time;
private long prevTime = System.currentTimeMillis();
PerformanceCounter perf = new PerformanceCounter("CounterEmitCounter"+this.name, 1000, 1000);
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Integer>());
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
i++;
if (i % 50000 == 0) {
time = System.currentTimeMillis();
System.out.println("Counter:\t" + i + "\t----Time: "
+ (time - prevTime));
prevTime = time;
}
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
......@@ -55,5 +48,6 @@ public class WordCountKvCounter extends UserTaskInvokable {
outRecord.setInteger(1,count);
emit(outRecord);
perf.count();
}
}
......@@ -30,9 +30,9 @@ public class WordCountLocal {
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountDummySource.class);
graphBuilder.setSource("WordCountSource", WordCountDummySource2.class,1,1);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 3, 3);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
......
......@@ -17,26 +17,21 @@ package eu.stratosphere.streaming.examples.wordcount;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.PerformanceCounter;
public class WordCountSink extends UserSinkInvokable {
int nrOfRecords = 0;
private long time;
private long prevTime = System.currentTimeMillis();
PerformanceCounter perf = new PerformanceCounter("SinkEmitCounter", 1000, 10000);
@Override
public void invoke(StreamRecord record) throws Exception {
nrOfRecords++;
if (nrOfRecords % 50000 == 0) {
time = System.currentTimeMillis();
System.out.println("Sink:\t" + nrOfRecords + "\t----Time: " + (time - prevTime));
prevTime = time;
}
perf.count();
}
@Override
public String getResult() {
return String.valueOf(nrOfRecords);
perf.writeCSV("C:/temp/strato/Sink.csv");
return "";
}
}
......@@ -18,28 +18,28 @@ package eu.stratosphere.streaming.examples.wordcount;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.PerformanceCounter;
public class WordCountSplitter extends UserTaskInvokable {
private String[] words = new String[] {};
private int i = 0;
private StreamRecord outputRecord = new StreamRecord(new Tuple1<String>());
private long time;
private long prevTime = System.currentTimeMillis();
PerformanceCounter perf = new PerformanceCounter("SplitterEmitCounter", 1000, 10000);
@Override
public void invoke(StreamRecord record) throws Exception {
i++;
if (i % 50000 == 0) {
time = System.currentTimeMillis();
System.out.println("Splitter:\t" + i + "\t----Time: " + (time - prevTime));
prevTime = time;
}
words = record.getString(0).split(" ");
for (String word : words) {
outputRecord.setString(0, word);
emit(outputRecord);
perf.count();
}
}
@Override
public String getResult() {
perf.writeCSV("C:/temp/strato/Splitter"+channelID+".csv");
return "";
}
}
\ No newline at end of file
......@@ -93,10 +93,10 @@ public class PerformanceTracker {
return csv.toString();
}
public void writeCSV(String file) {
public void writeCSV(String fname) {
try {
PrintWriter out = new PrintWriter(file);
PrintWriter out = new PrintWriter(fname);
out.print(toString());
out.close();
......
......@@ -9,30 +9,53 @@ import matplotlib.pyplot as plt
import pandas as pd
import os
def plotPerformance(csv_dir):
csv_dir=csv_dir
def readFiles(csv_dir):
dataframes={}
for fname in os.listdir(csv_dir):
if '.csv' in fname:
df=pd.read_csv(os.path.join(csv_dir,fname),index_col='Time')
speed=[0]
values=list(df.ix[:,0])
for i in range(1,len(values)):
speed.append(float(values[i]-values[i-1])/float(df.index[i]-df.index[i-1]))
df['speed']=speed
dataframes[fname.rstrip('.csv')]=df
dataframes[fname.rstrip('.csv')]=pd.read_csv(os.path.join(csv_dir,fname),index_col='Time')
return dataframes
def plotCounter(csv_dir, smooth=5):
dataframes= readFiles(csv_dir)
for name in dataframes:
df=dataframes[name]
speed=[0]
values=list(df.ix[:,0])
for i in range(1,len(values)):
speed.append(float(values[i]-values[i-1])/float(df.index[i]-df.index[i-1]))
df['speed']=speed
plt.figure(figsize=(12, 8), dpi=80)
plt.title('Values')
plt.title('Counter')
for name in dataframes.keys():
for name in dataframes:
dataframes[name].ix[:,0].plot()
plt.legend(dataframes.keys())
plt.figure(figsize=(12, 8), dpi=80)
plt.title('dV/dT')
plt.title('dC/dT')
for name in dataframes:
pd.rolling_mean(dataframes[name].speed,smooth).plot()
plt.legend(dataframes.keys())
def plotTimer(csv_dir,smooth=5,std=50):
dataframes= readFiles(csv_dir)
plt.figure(figsize=(12, 8), dpi=80)
plt.title('Timer')
for name in dataframes:
pd.rolling_mean(dataframes[name].ix[:,0],smooth).plot()
plt.legend(dataframes.keys())
plt.figure(figsize=(12, 8), dpi=80)
plt.title('Standard deviance')
for name in dataframes.keys():
dataframes[name].speed.plot()
for name in dataframes:
pd.rolling_std(dataframes[name].ix[:,0],std).plot()
plt.legend(dataframes.keys())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册