提交 e4321954 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] wordcount example modified for cluster testing

上级 fdb55b54
......@@ -44,14 +44,14 @@ public abstract class StreamInvokableComponent {
public final void emit(StreamRecord record) {
record.setId(channelID);
// emittedRecords.addRecord(record);
emittedRecords.addRecord(record);
try {
for (RecordWriter<StreamRecord> output : outputs) {
output.emit(record);
log.info("EMITTED: " + record.getId() + " -- " + name);
}
} catch (Exception e) {
// emittedRecords.failRecord(record.getId());
emittedRecords.failRecord(record.getId());
log.warn("FAILED: " + record.getId() + " -- " + name + " -- due to " + e.getClass().getSimpleName());
}
}
......
......@@ -22,6 +22,7 @@ import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.PerformanceCounter;
import eu.stratosphere.streaming.util.PerformanceTimer;
public class WordCountCounter extends UserTaskInvokable {
......@@ -29,33 +30,37 @@ public class WordCountCounter extends UserTaskInvokable {
private String word = "";
private Integer count = 0;
PerformanceCounter perf = new PerformanceCounter("CounterEmitCounter", 1000, 10000);
PerformanceCounter pCounter = new PerformanceCounter("CounterEmitCounter", 1000, 10000);
PerformanceTimer pTimer = new PerformanceTimer("CounterEmitTimer", 1000, 10000, true);
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Integer>());
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
} else {
count=1;
count = 1;
wordCounts.put(word, 1);
}
outRecord.setString(0,word);
outRecord.setInteger(1,count);
outRecord.setString(0, word);
outRecord.setInteger(1, count);
pTimer.startTimer();
emit(outRecord);
perf.count();
pTimer.stopTimer();
pCounter.count();
}
@Override
public String getResult(){
perf.writeCSV("C:/temp/strato/Counter"+channelID+".csv");
public String getResult() {
pCounter.writeCSV("/home/strato/strato-dist/log/counter/Counter" + channelID);
pTimer.writeCSV("/home/strato/strato-dist/log/timer/Counter" + channelID);
return "";
}
}
......@@ -19,11 +19,13 @@ import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.PerformanceCounter;
import eu.stratosphere.streaming.util.PerformanceTimer;
public class WordCountDummySource2 extends UserSourceInvokable {
StreamRecord record = new StreamRecord(new Tuple1<String>());
PerformanceCounter perf = new PerformanceCounter("SourceEmitCounter", 1000, 10000);
PerformanceCounter pCounter = new PerformanceCounter("SourceEmitCounter", 1000, 1000);
PerformanceTimer pTimer = new PerformanceTimer("SourceEmitTimer", 1000, 1000, true);
public WordCountDummySource2() {
}
......@@ -38,14 +40,18 @@ public class WordCountDummySource2 extends UserSourceInvokable {
} else {
record.setString(0, "Gabor Frank");
}
pTimer.startTimer();
emit(record);
perf.count();
pTimer.stopTimer();
pCounter.count();
}
}
@Override
public String getResult() {
perf.writeCSV("C:/temp/strato/Source"+channelID+".csv");
pCounter.writeCSV("/home/strato/log/counter/Source" + channelID);
pTimer.writeCSV("/home/strato/log/timer/Source" + channelID);
return "";
}
}
......@@ -28,12 +28,19 @@ import eu.stratosphere.streaming.util.LogUtils;
public class WordCountLocal {
private static JobGraph getJobGraph() throws Exception {
private static JobGraph getJobGraph(int sourceSubtasks, int sourceSubtasksPerInstance,
int splitterSubtasks, int splitterSubtasksPerInstance, int counterSubtasks,
int counterSubtasksPerInstance, int sinkSubtasks, int sinkSubtasksPerInstance)
throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountDummySource2.class,1,1);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 3, 3);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.setSource("WordCountSource", WordCountSource.class, sourceSubtasks,
sourceSubtasksPerInstance);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, splitterSubtasks,
splitterSubtasksPerInstance);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, counterSubtasks,
counterSubtasksPerInstance);
graphBuilder.setSink("WordCountSink", WordCountSink.class, sinkSubtasks,
sinkSubtasksPerInstance);
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0);
......@@ -42,41 +49,75 @@ public class WordCountLocal {
return graphBuilder.getJobGraph();
}
private static void wrongArgs() {
System.out
.println("USAGE:\n"
+ "run <local/cluster> <SOURCE num of subtasks> <SOURCE subtasks per instance> <SPLITTER num of subtasks> <SPLITTER subtasks per instance> <COUNTER num of subtasks> <COUNTER subtasks per instance> <SINK num of subtasks> <SINK subtasks per instance>");
}
// TODO: arguments check
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.ERROR, Level.INFO);
if (args.length != 9) {
wrongArgs();
} else {
LogUtils.initializeDefaultConsoleLogger(Level.ERROR, Level.INFO);
int sourceSubtasks = 1;
int sourceSubtasksPerInstance = 1;
int splitterSubtasks = 1;
int splitterSubtasksPerInstance = 1;
int counterSubtasks = 1;
int counterSubtasksPerInstance = 1;
int sinkSubtasks = 1;
int sinkSubtasksPerInstance = 1;
try {
sourceSubtasks = Integer.parseInt(args[1]);
sourceSubtasksPerInstance = Integer.parseInt(args[2]);
splitterSubtasks = Integer.parseInt(args[3]);
splitterSubtasksPerInstance = Integer.parseInt(args[4]);
counterSubtasks = Integer.parseInt(args[5]);
counterSubtasksPerInstance = Integer.parseInt(args[6]);
sinkSubtasks = Integer.parseInt(args[7]);
sinkSubtasksPerInstance = Integer.parseInt(args[8]);
} catch (Exception e) {
wrongArgs();
}
try {
JobGraph jG = getJobGraph();
Configuration configuration = jG.getJobConfiguration();
try {
JobGraph jG = getJobGraph(sourceSubtasks, sourceSubtasksPerInstance,
splitterSubtasks, splitterSubtasksPerInstance, counterSubtasks,
counterSubtasksPerInstance, sinkSubtasks, sinkSubtasksPerInstance);
Configuration configuration = jG.getJobConfiguration();
if (args.length == 0) {
args = new String[] { "local" };
}
if (args.length == 0) {
args = new String[] { "local" };
}
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
if (args[0].equals("local")) {
System.out.println("Running in Local mode");
NepheleMiniCluster exec = new NepheleMiniCluster();
exec.start();
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
Client client = new Client(new InetSocketAddress("localhost", 6498),
configuration);
client.run(jG, true);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
Client client = new Client(new InetSocketAddress("hadoop01",
6123), configuration);
client.run(jG, true);
}
} catch (Exception e) {
System.out.println(e);
} catch (Exception e) {
System.out.println(e);
}
}
}
}
......@@ -30,7 +30,7 @@ public class WordCountSink extends UserSinkInvokable {
@Override
public String getResult() {
perf.writeCSV("C:/temp/strato/Sink.csv");
perf.writeCSV("/home/strato/strato-dist/log/counter/Sink");
return "";
}
......
......@@ -22,6 +22,8 @@ import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.PerformanceCounter;
import eu.stratosphere.streaming.util.PerformanceTimer;
public class WordCountSource extends UserSourceInvokable {
......@@ -29,24 +31,41 @@ public class WordCountSource extends UserSourceInvokable {
private String line = new String();
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
public WordCountSource() {
try {
br = new BufferedReader(new FileReader("src/test/resources/testdata/hamlet.txt"));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
PerformanceCounter pCounter = new PerformanceCounter("SourceEmitCounter", 1000, 1000);
PerformanceTimer pTimer = new PerformanceTimer("SourceEmitTimer", 1000, 1000, true);
@Override
public void invoke() throws Exception {
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
while (line != null) {
if (line != "") {
outRecord.setString(0,line);
// TODO: object reuse
emit(outRecord);
for (int i = 0; i < 10; i++) {
try {
br = new BufferedReader(new FileReader("/home/strato/strato-dist/resources/hamlet.txt"));
line = br.readLine().replaceAll("[\\-\\+\\.\\^:,]", "");
while (line != null) {
if (line != "") {
outRecord.setString(0, line);
// TODO: object reuse
pTimer.startTimer();
emit(outRecord);
pTimer.stopTimer();
pCounter.count();
}
line = br.readLine();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
line = br.readLine();
}
}
@Override
public String getResult() {
pCounter.writeCSV("/home/strato/strato-dist/log/counter/Source" + channelID);
pTimer.writeCSV("/home/strato/strato-dist/log/timer/Source" + channelID);
return "";
}
}
\ No newline at end of file
......@@ -19,12 +19,14 @@ import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.PerformanceCounter;
import eu.stratosphere.streaming.util.PerformanceTimer;
public class WordCountSplitter extends UserTaskInvokable {
private String[] words = new String[] {};
private StreamRecord outputRecord = new StreamRecord(new Tuple1<String>());
PerformanceCounter perf = new PerformanceCounter("SplitterEmitCounter", 1000, 10000);
PerformanceCounter pCounter = new PerformanceCounter("SplitterEmitCounter", 1000, 10000);
PerformanceTimer pTimer = new PerformanceTimer("SplitterEmitTimer", 1000, 10000, true);
@Override
public void invoke(StreamRecord record) throws Exception {
......@@ -32,14 +34,17 @@ public class WordCountSplitter extends UserTaskInvokable {
words = record.getString(0).split(" ");
for (String word : words) {
outputRecord.setString(0, word);
pTimer.startTimer();
emit(outputRecord);
perf.count();
pTimer.stopTimer();
pCounter.count();
}
}
@Override
public String getResult() {
perf.writeCSV("C:/temp/strato/Splitter"+channelID+".csv");
pCounter.writeCSV("/home/strato/strato-dist/log/counter/Splitter" + channelID);
pTimer.writeCSV("/home/strato/strato-dist/log/timer/Splitter" + channelID);
return "";
}
}
\ No newline at end of file
......@@ -9,16 +9,22 @@ import matplotlib.pyplot as plt
import pandas as pd
import os
linestyles = ['_', '-', '--', ':']
markers=['x','o','^','+']
def readFiles(csv_dir):
dataframes={}
machine=[]
for fname in os.listdir(csv_dir):
if '.csv' in fname:
dataframes[fname.rstrip('.csv')]=pd.read_csv(os.path.join(csv_dir,fname),index_col='Time')
return dataframes
machine.append(int(fname.rstrip('.csv')[-1]))
return dataframes,machine
def plotCounter(csv_dir, smooth=5):
dataframes= readFiles(csv_dir)
dataframes,machine= readFiles(csv_dir)
for name in dataframes:
df=dataframes[name]
speed=[0]
......@@ -30,21 +36,28 @@ def plotCounter(csv_dir, smooth=5):
plt.figure(figsize=(12, 8), dpi=80)
plt.title('Counter')
for name in dataframes:
dataframes[name].ix[:,0].plot()
for name in enumerate(dataframes):
if len(markers)>machine[name[0]]:
m=markers[machine[name[0]]]
else: m='*'
dataframes[name[1]].ix[:,0].plot(marker=m,markevery=10,markersize=10)
plt.legend(dataframes.keys())
plt.figure(figsize=(12, 8), dpi=80)
plt.title('dC/dT')
for name in dataframes:
pd.rolling_mean(dataframes[name].speed,smooth).plot()
for name in enumerate(dataframes):
if len(markers)>machine[name[0]]:
m=markers[machine[name[0]]]
else: m='*'
pd.rolling_mean(dataframes[name[1]].speed,smooth).plot(marker=m,markevery=10,markersize=10)
plt.legend(dataframes.keys())
def plotTimer(csv_dir,smooth=5,std=50):
dataframes= readFiles(csv_dir)
dataframes,machine= readFiles(csv_dir)
plt.figure(figsize=(12, 8), dpi=80)
plt.title('Timer')
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册