提交 76dd218d 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] performance test scripts updated

上级 ce548603
......@@ -9,8 +9,10 @@ import matplotlib.pyplot as plt
import pandas as pd
import os
linestyles = ['_', '-', '--', ':']
markers=['x','o','^','+']
markers=['D','s', '|', '', 'x', '_', '^', ' ', 'd', 'h', '+', '*', ',', 'o', '.', '1', 'p', 'H', 'v', '>'];
colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k']
def readFiles(csv_dir):
dataframes=[]
......@@ -36,9 +38,9 @@ def plotCounter(csv_dir, smooth=5):
plt.title('Counter')
for dataframe in dataframes:
if len(markers)>dataframe[1]:
m=markers[dataframe[1]]
else: m='*'
m=markers[dataframe[1]%len(markers)]
dataframe[2].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in dataframes])
......@@ -46,9 +48,9 @@ def plotCounter(csv_dir, smooth=5):
plt.title('dC/dT')
for dataframe in dataframes:
if len(markers)>dataframe[1]:
m=markers[dataframe[1]]
else: m='*'
m=markers[dataframe[1]%len(markers)]
pd.rolling_mean(dataframe[2].speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in dataframes])
......@@ -61,9 +63,9 @@ def plotTimer(csv_dir,smooth=5,std=50):
plt.title('Timer')
for dataframe in dataframes:
if len(markers)>dataframe[1]:
m=markers[dataframe[1]]
else: m='*'
m=markers[dataframe[1]%len(markers)]
pd.rolling_mean(dataframe[2].ix[:,0],smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in dataframes])
......@@ -71,8 +73,8 @@ def plotTimer(csv_dir,smooth=5,std=50):
plt.title('Standard deviance')
for dataframe in dataframes:
if len(markers)>dataframe[1]:
m=markers[dataframe[1]]
else: m='*'
m=markers[dataframe[1]%len(markers)]
pd.rolling_std(dataframe[2].ix[:,0],std).plot(marker=m,markevery=10,markersize=10)
plt.legend([x[0] for x in dataframes])
package storm.starter;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import storm.starter.spout.RandomSentenceSpout;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.ShellBolt;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
/**
* This topology demonstrates Storm's stream groupings and multilang
* capabilities.
*/
public class WordCountTopology {
public static class HamletSpout extends BaseRichSpout {
SpoutOutputCollector _collector;
final static String path = "/home/hermann/asd.txt";
final static int emitHamlets = 5;
BufferedReader br = null;
int numberOfHamlets;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
try {
br = new BufferedReader(new FileReader(path));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
numberOfHamlets = 0;
}
@Override
public void nextTuple() {
Utils.sleep(100);
String line = "";
line = getLine();
if (line != null) {
_collector.emit(new Values(line));
}
}
public String getLine() {
String line;
try {
line = br.readLine();
if (line == null && numberOfHamlets < emitHamlets) {
numberOfHamlets++;
br = new BufferedReader(new FileReader(path));
line = br.readLine();
}
return line;
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
@Override
public void ack(Object id) {
}
@Override
public void fail(Object id) {
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new HamletSpout(), 1);
builder.setBolt("split", new SplitSentence(), 1).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 1).fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
#!/bin/bash
toDir=$1
if [ -d "${toDir}" ] ; then
ssh strato@dell150.ilab.sztaki.hu '
for j in {101..142} 144 145;
do
for i in $(ssh dell$j "ls stratosphere-distrib/log/counter/");
do scp strato@dell$j:stratosphere-distrib/log/counter/$i stratosphere-distrib/log/all_tests/counter/$i-$j.csv;
done
for i in $(ssh dell$j "ls stratosphere-distrib/log/timer/");
do scp strato@dell$j:stratosphere-distrib/log/timer/$i stratosphere-distrib/log/all_tests/timer/$i-$j.csv;
done
for i in $(ls stratosphere-distrib/log/counter/);
do cp stratosphere-distrib/log/counter/$i stratosphere-distrib/log/all_tests/counter/$i-150.csv;
done
for i in $(ls stratosphere-distrib/log/timer/);
do cp stratosphere-distrib/log/timer/$i stratosphere-distrib/log/all_tests/timer/$i-150.csv;
done
done
'
scp strato@dell150.ilab.sztaki.hu:stratosphere-distrib/log/all_tests/counter/* $toDir/counter/
scp strato@dell150.ilab.sztaki.hu:stratosphere-distrib/log/all_tests/timer/* $toDir/timer/
else
echo "USAGE:"
echo "run <directory>"
fi
\ No newline at end of file
#!/bin/bash
ssh strato@dell150.ilab.sztaki.hu '
for j in {101..142} 144 145;
do
$(ssh dell$j 'rm stratosphere-distrib/log/counter/*');
$(ssh dell$j 'rm stratosphere-distrib/log/timer/*');
done
rm stratosphere-distrib/log/counter/*
rm stratosphere-distrib/log/timer/*
rm stratosphere-distrib/log/all_tests/counter/*
rm stratosphere-distrib/log/all_tests/timer/*
'
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册