diff --git a/flink-addons/flink-streaming/src/test/resources/Performance/PerformanceTracker.py b/flink-addons/flink-streaming/src/test/resources/Performance/PerformanceTracker.py index b845d9dce384e075b0b50db468d8590b7bd4f515..fa481c445563cff6e3586de17e660f204e7538f5 100755 --- a/flink-addons/flink-streaming/src/test/resources/Performance/PerformanceTracker.py +++ b/flink-addons/flink-streaming/src/test/resources/Performance/PerformanceTracker.py @@ -9,8 +9,10 @@ import matplotlib.pyplot as plt import pandas as pd import os + linestyles = ['_', '-', '--', ':'] -markers=['x','o','^','+'] +markers=['D','s', '|', '', 'x', '_', '^', ' ', 'd', 'h', '+', '*', ',', 'o', '.', '1', 'p', 'H', 'v', '>']; +colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k'] def readFiles(csv_dir): dataframes=[] @@ -36,9 +38,9 @@ def plotCounter(csv_dir, smooth=5): plt.title('Counter') for dataframe in dataframes: - if len(markers)>dataframe[1]: - m=markers[dataframe[1]] - else: m='*' + + m=markers[dataframe[1]%len(markers)] + dataframe[2].ix[:,0].plot(marker=m,markevery=10,markersize=10) plt.legend([x[0] for x in dataframes]) @@ -46,9 +48,9 @@ def plotCounter(csv_dir, smooth=5): plt.title('dC/dT') for dataframe in dataframes: - if len(markers)>dataframe[1]: - m=markers[dataframe[1]] - else: m='*' + + m=markers[dataframe[1]%len(markers)] + pd.rolling_mean(dataframe[2].speed,smooth).plot(marker=m,markevery=10,markersize=10) plt.legend([x[0] for x in dataframes]) @@ -61,9 +63,9 @@ def plotTimer(csv_dir,smooth=5,std=50): plt.title('Timer') for dataframe in dataframes: - if len(markers)>dataframe[1]: - m=markers[dataframe[1]] - else: m='*' + + m=markers[dataframe[1]%len(markers)] + pd.rolling_mean(dataframe[2].ix[:,0],smooth).plot(marker=m,markevery=10,markersize=10) plt.legend([x[0] for x in dataframes]) @@ -71,8 +73,8 @@ def plotTimer(csv_dir,smooth=5,std=50): plt.title('Standard deviance') for dataframe in dataframes: - if len(markers)>dataframe[1]: - m=markers[dataframe[1]] - else: m='*' + + m=markers[dataframe[1]%len(markers)] + pd.rolling_std(dataframe[2].ix[:,0],std).plot(marker=m,markevery=10,markersize=10) plt.legend([x[0] for x in dataframes]) diff --git a/flink-addons/flink-streaming/src/test/resources/Performance/WordCountTopology.java b/flink-addons/flink-streaming/src/test/resources/Performance/WordCountTopology.java new file mode 100644 index 0000000000000000000000000000000000000000..ddd4f45d15c655c7ac17752a550ddbf7173d013c --- /dev/null +++ b/flink-addons/flink-streaming/src/test/resources/Performance/WordCountTopology.java @@ -0,0 +1,160 @@ +package storm.starter; + +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import storm.starter.spout.RandomSentenceSpout; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.ShellBolt; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.BasicOutputCollector; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseBasicBolt; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import backtype.storm.utils.Utils; + +/** + * This topology demonstrates Storm's stream groupings and multilang + * capabilities. + */ +public class WordCountTopology { + + public static class HamletSpout extends BaseRichSpout { + SpoutOutputCollector _collector; + final static String path = "/home/hermann/asd.txt"; + final static int emitHamlets = 5; + + BufferedReader br = null; + int numberOfHamlets; + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + _collector = collector; + try { + br = new BufferedReader(new FileReader(path)); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } + + numberOfHamlets = 0; + } + + @Override + public void nextTuple() { + Utils.sleep(100); + String line = ""; + line = getLine(); + + if (line != null) { + _collector.emit(new Values(line)); + } + } + + public String getLine() { + String line; + try { + line = br.readLine(); + if (line == null && numberOfHamlets < emitHamlets) { + numberOfHamlets++; + br = new BufferedReader(new FileReader(path)); + line = br.readLine(); + } + return line; + } catch (IOException e) { + e.printStackTrace(); + return null; + } + } + + @Override + public void ack(Object id) { + } + + @Override + public void fail(Object id) { + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + } + + public static class SplitSentence extends ShellBolt implements IRichBolt { + + public SplitSentence() { + super("python", "splitsentence.py"); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + + @Override + public Map getComponentConfiguration() { + return null; + } + } + + public static class WordCount extends BaseBasicBolt { + Map counts = new HashMap(); + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String word = tuple.getString(0); + Integer count = counts.get(word); + if (count == null) + count = 0; + count++; + counts.put(word, count); + collector.emit(new Values(word, count)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + public static void main(String[] args) throws Exception { + + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("spout", new HamletSpout(), 1); + + builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout"); + builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new Fields("word")); + + Config conf = new Config(); + conf.setDebug(true); + + if (args != null && args.length > 0) { + conf.setNumWorkers(3); + + StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); + } else { + conf.setMaxTaskParallelism(3); + + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("word-count", conf, builder.createTopology()); + + Thread.sleep(10000); + + cluster.shutdown(); + } + } +} diff --git a/flink-addons/flink-streaming/src/test/resources/Performance/copy-files.sh b/flink-addons/flink-streaming/src/test/resources/Performance/copy-files.sh new file mode 100755 index 0000000000000000000000000000000000000000..ca2f71c2813d9739929d714ad3a49b17dc4b359a --- /dev/null +++ b/flink-addons/flink-streaming/src/test/resources/Performance/copy-files.sh @@ -0,0 +1,27 @@ +#!/bin/bash +toDir=$1 + +if [ -d "${toDir}" ] ; then + ssh strato@dell150.ilab.sztaki.hu ' + for j in {101..142} 144 145; + do + for i in $(ssh dell$j "ls stratosphere-distrib/log/counter/"); + do scp strato@dell$j:stratosphere-distrib/log/counter/$i stratosphere-distrib/log/all_tests/counter/$i-$j.csv; + done + for i in $(ssh dell$j "ls stratosphere-distrib/log/timer/"); + do scp strato@dell$j:stratosphere-distrib/log/timer/$i stratosphere-distrib/log/all_tests/timer/$i-$j.csv; + done + for i in $(ls stratosphere-distrib/log/counter/); + do cp stratosphere-distrib/log/counter/$i stratosphere-distrib/log/all_tests/counter/$i-150.csv; + done + for i in $(ls stratosphere-distrib/log/timer/); + do cp stratosphere-distrib/log/timer/$i stratosphere-distrib/log/all_tests/timer/$i-150.csv; + done + done + ' + scp strato@dell150.ilab.sztaki.hu:stratosphere-distrib/log/all_tests/counter/* $toDir/counter/ + scp strato@dell150.ilab.sztaki.hu:stratosphere-distrib/log/all_tests/timer/* $toDir/timer/ +else + echo "USAGE:" + echo "run " +fi \ No newline at end of file diff --git a/flink-addons/flink-streaming/src/test/resources/Performance/remove-files.sh b/flink-addons/flink-streaming/src/test/resources/Performance/remove-files.sh new file mode 100755 index 0000000000000000000000000000000000000000..02220c8b9d3fcb2a505372f4152a2185249f7a6b --- /dev/null +++ b/flink-addons/flink-streaming/src/test/resources/Performance/remove-files.sh @@ -0,0 +1,16 @@ +#!/bin/bash +ssh strato@dell150.ilab.sztaki.hu ' +for j in {101..142} 144 145; +do + $(ssh dell$j 'rm stratosphere-distrib/log/counter/*'); + $(ssh dell$j 'rm stratosphere-distrib/log/timer/*'); + +done + +rm stratosphere-distrib/log/counter/* +rm stratosphere-distrib/log/timer/* +rm stratosphere-distrib/log/all_tests/counter/* +rm stratosphere-distrib/log/all_tests/timer/* +' + +