diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java index e0cfbe271793baa496fb4c64e82bb90a374fcbd3..b8e8c0632ea1b3fbfd1cfaea7b1c1d814795eb2a 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountCounter.java @@ -30,8 +30,8 @@ public class WordCountCounter extends UserTaskInvokable { private String word = ""; private Integer count = 0; - PerformanceCounter pCounter = new PerformanceCounter("CounterEmitCounter", 1000, 10000); - PerformanceTimer pTimer = new PerformanceTimer("CounterEmitTimer", 1000, 10000, true); + PerformanceCounter pCounter = new PerformanceCounter("CounterEmitCounter", 1000, 1000); + PerformanceTimer pTimer = new PerformanceTimer("CounterEmitTimer", 1000, 1000, true); private StreamRecord outRecord = new StreamRecord(new Tuple2()); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSink.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSink.java index 0947a775814901cb77352eeabb03fab496304e7b..4acb7725ca65b9c74d45352e2630c1764b19fb47 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSink.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSink.java @@ -21,7 +21,7 @@ import eu.stratosphere.streaming.util.PerformanceCounter; public class WordCountSink extends UserSinkInvokable { - PerformanceCounter perf = new PerformanceCounter("SinkEmitCounter", 1000, 10000); + PerformanceCounter perf = new PerformanceCounter("SinkEmitCounter", 1000, 1000); @Override public void invoke(StreamRecord record) throws Exception { diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java index 4c473c13b1fd4ad5ceb0a33572dacf9dc7924a18..638a1faeabf8fc4be600163dec9b7d4fef58b194 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSource.java @@ -37,7 +37,7 @@ public class WordCountSource extends UserSourceInvokable { @Override public void invoke() throws Exception { - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 2; i++) { try { br = new BufferedReader(new FileReader("/home/strato/strato-dist/resources/hamlet.txt")); diff --git a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSplitter.java b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSplitter.java index f1b815636b6ad0ab892d4d70ef89e956e46036dc..9d313b0965be7f8993982cba3d2360d57e348cc3 100644 --- a/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSplitter.java +++ b/flink-addons/flink-streaming/src/main/java/eu/stratosphere/streaming/examples/wordcount/WordCountSplitter.java @@ -25,8 +25,8 @@ public class WordCountSplitter extends UserTaskInvokable { private String[] words = new String[] {}; private StreamRecord outputRecord = new StreamRecord(new Tuple1()); - PerformanceCounter pCounter = new PerformanceCounter("SplitterEmitCounter", 1000, 10000); - PerformanceTimer pTimer = new PerformanceTimer("SplitterEmitTimer", 1000, 10000, true); + PerformanceCounter pCounter = new PerformanceCounter("SplitterEmitCounter", 1000, 1000); + PerformanceTimer pTimer = new PerformanceTimer("SplitterEmitTimer", 1000, 1000, true); @Override public void invoke(StreamRecord record) throws Exception { diff --git a/flink-addons/flink-streaming/src/test/resources/Performance/PerformanceTracker.py b/flink-addons/flink-streaming/src/test/resources/Performance/PerformanceTracker.py index eb6df8bb804752e4f303c45007d77515155aa005..b845d9dce384e075b0b50db468d8590b7bd4f515 100755 --- a/flink-addons/flink-streaming/src/test/resources/Performance/PerformanceTracker.py +++ b/flink-addons/flink-streaming/src/test/resources/Performance/PerformanceTracker.py @@ -12,21 +12,20 @@ import os linestyles = ['_', '-', '--', ':'] markers=['x','o','^','+'] def readFiles(csv_dir): - dataframes={} - machine=[] + dataframes=[] + for fname in os.listdir(csv_dir): if '.csv' in fname: - dataframes[fname.rstrip('.csv')]=pd.read_csv(os.path.join(csv_dir,fname),index_col='Time') - machine.append(int(fname.rstrip('.csv')[-1])) - return dataframes,machine + dataframes.append((fname.rstrip('.csv'),int(fname.rstrip('.csv').split('-')[-1])-1,pd.read_csv(os.path.join(csv_dir,fname),index_col='Time'))) + return dataframes def plotCounter(csv_dir, smooth=5): - dataframes,machine= readFiles(csv_dir) + dataframes= readFiles(csv_dir) - for name in dataframes: - df=dataframes[name] + for dataframe in dataframes: + df=dataframe[2] speed=[0] values=list(df.ix[:,0]) for i in range(1,len(values)): @@ -36,39 +35,44 @@ def plotCounter(csv_dir, smooth=5): plt.figure(figsize=(12, 8), dpi=80) plt.title('Counter') - for name in enumerate(dataframes): - if len(markers)>machine[name[0]]: - m=markers[machine[name[0]]] + for dataframe in dataframes: + if len(markers)>dataframe[1]: + m=markers[dataframe[1]] else: m='*' - - dataframes[name[1]].ix[:,0].plot(marker=m,markevery=10,markersize=10) - plt.legend(dataframes.keys()) + dataframe[2].ix[:,0].plot(marker=m,markevery=10,markersize=10) + plt.legend([x[0] for x in dataframes]) plt.figure(figsize=(12, 8), dpi=80) plt.title('dC/dT') - for name in enumerate(dataframes): - if len(markers)>machine[name[0]]: - m=markers[machine[name[0]]] - else: m='*' - pd.rolling_mean(dataframes[name[1]].speed,smooth).plot(marker=m,markevery=10,markersize=10) - plt.legend(dataframes.keys()) + for dataframe in dataframes: + if len(markers)>dataframe[1]: + m=markers[dataframe[1]] + else: m='*' + pd.rolling_mean(dataframe[2].speed,smooth).plot(marker=m,markevery=10,markersize=10) + plt.legend([x[0] for x in dataframes]) def plotTimer(csv_dir,smooth=5,std=50): - dataframes,machine= readFiles(csv_dir) + dataframes= readFiles(csv_dir) plt.figure(figsize=(12, 8), dpi=80) plt.title('Timer') - for name in dataframes: - pd.rolling_mean(dataframes[name].ix[:,0],smooth).plot() - plt.legend(dataframes.keys()) + for dataframe in dataframes: + if len(markers)>dataframe[1]: + m=markers[dataframe[1]] + else: m='*' + pd.rolling_mean(dataframe[2].ix[:,0],smooth).plot(marker=m,markevery=10,markersize=10) + plt.legend([x[0] for x in dataframes]) plt.figure(figsize=(12, 8), dpi=80) plt.title('Standard deviance') - for name in dataframes: - pd.rolling_std(dataframes[name].ix[:,0],std).plot() - plt.legend(dataframes.keys()) + for dataframe in dataframes: + if len(markers)>dataframe[1]: + m=markers[dataframe[1]] + else: m='*' + pd.rolling_std(dataframe[2].ix[:,0],std).plot(marker=m,markevery=10,markersize=10) + plt.legend([x[0] for x in dataframes]) diff --git a/flink-addons/flink-streaming/src/test/resources/Performance/copy-files.sh b/flink-addons/flink-streaming/src/test/resources/Performance/copy-files.sh new file mode 100644 index 0000000000000000000000000000000000000000..0259aeb9bbe45673fbafb0404b4ca0a3aa07431a --- /dev/null +++ b/flink-addons/flink-streaming/src/test/resources/Performance/copy-files.sh @@ -0,0 +1,27 @@ +#!/bin/bash +toDir=$1 + +if [ -d "${toDir}" ] ; then + ssh strato@dell150.ilab.sztaki.hu ' + for j in {101..142} 144 145; + do + for i in $(ssh dell$j "ls stratosphere-distrib/log/counter/"); + do scp strato@dell$j:stratosphere-distrib/log/counter/$i stratosphere-distrib/log/all_tests/counter/$i-$j.csv; + done + for i in $(ssh dell$j "ls stratosphere-distrib/log/timer/"); + do scp strato@dell$j:stratosphere-distrib/log/timer/$i stratosphere-distrib/log/all_tests/timer/$i-$j.csv; + done + for i in $(ls stratosphere-distrib/log/counter/); + do cp stratosphere-distrib/log/counter/$i stratosphere-distrib/log/all_tests/counter/$i-150.csv; + done + for i in $(ls stratosphere-distrib/log/timer/); + do cp stratosphere-distrib/log/timer/$i stratosphere-distrib/log/all_tests/timer/$i-150.csv; + done + done + ' + scp strato@dell150.ilab.sztaki.hu:stratosphere-distrib/log/counter/* $toDir/counter/ + scp strato@dell150.ilab.sztaki.hu:stratosphere-distrib/log/timer/* $toDir/timer/ +else + echo "USAGE:" + echo "run " +fi \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/test/resources/Performance/remove-files.sh b/flink-addons/flink-streaming/src/test/resources/Performance/remove-files.sh new file mode 100644 index 0000000000000000000000000000000000000000..3f8326dc7c4a1b688e468257c386c2a0448086c7 --- /dev/null +++ b/flink-addons/flink-streaming/src/test/resources/Performance/remove-files.sh @@ -0,0 +1,10 @@ +#!/bin/bash +ssh strato@dell150.ilab.sztaki.hu ' +for j in {101..142} 144 145; +do + $(ssh dell$j "rm stratosphere-distrib/log/counter/*"); + $(ssh dell$j "rm stratosphere-distrib/log/timer/*"); + rm stratosphere-distrib/log/counter/* + rm stratosphere-distrib/log/timer/* +done +' \ No newline at end of file