提交 fe142630 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] performance tracker class added

上级 22fa55b0
......@@ -18,8 +18,6 @@ package eu.stratosphere.streaming.examples.wordcount;
import java.io.File;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Level;
......@@ -37,115 +35,76 @@ import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.LogUtils;
import eu.stratosphere.streaming.util.PerformanceTracker;
public class WordCountRemote {
private final static int recordsEmitted = 100000;
private final static int statPerRecords = 10000;
private final static int recordsEmittedFromSplitter = 350000; // recordsEmitted * wordsEmittedInARecord
private final static int statPerRecordsAfterSplit = recordsEmittedFromSplitter / recordsEmitted * statPerRecords;
public static class WordCountDebugSource extends UserSourceInvokable {
int statCounter = 0;
int[] atNumOfRecords = new int[recordsEmitted / statPerRecords + 1];
long[] times = new long[recordsEmitted / statPerRecords + 1];
private PerformanceTracker perf = new PerformanceTracker(1000, 10000);
StreamRecord record = new StreamRecord(new Tuple1<String>());
@Override
public void invoke() throws Exception {
atNumOfRecords[statCounter] = 0;
times[statCounter] = System.currentTimeMillis();
statCounter++;
for (int i = 1; i <= recordsEmitted; i++) {
if (i % statPerRecords == 0) {
atNumOfRecords[statCounter] = i;
times[statCounter] = System.currentTimeMillis();
statCounter++;
}
if (i % 2 == 0) {
record.setString(0, "Gyula Marci switched");
} else {
record.setString(0, "Gabor Frank to FINISHED");
}
emit(record);
perf.count();
}
}
@Override
public String getResult() {
StringBuilder result = new StringBuilder("");
result.append("Source result:\n");
for (int i = 0; i < atNumOfRecords.length; i++) {
result.append(atNumOfRecords[i] + ";" + times[i] + ";\n");
}
return result.toString();
return perf.createCSV();
}
}
public static class WordCountDebugSplitter extends UserTaskInvokable {
int statCounter = 0;
int[] atNumOfRecords = new int[recordsEmitted / statPerRecords + 1];
long[] times = new long[atNumOfRecords.length];
private PerformanceTracker perf = new PerformanceTracker(1000, 10000);
private String[] words = new String[] {};
private int i = 0;
private StreamRecord outputRecord = new StreamRecord(new Tuple1<String>());
long time = System.currentTimeMillis();
@Override
public void invoke(StreamRecord record) throws Exception {
i++;
if (i % statPerRecords == 0) {
atNumOfRecords[statCounter] = i;
times[statCounter] = System.currentTimeMillis();
statCounter++;
}
words = record.getString(0).split(" ");
for (String word : words) {
outputRecord.setString(0, word);
emit(outputRecord);
perf.count();
}
}
@Override
public String getResult() {
StringBuilder result = new StringBuilder("");
result.append("Splitter result:\n");
for (int i = 0; i < atNumOfRecords.length; i++) {
result.append(atNumOfRecords[i] + ";" + times[i] + ";\n");
}
return result.toString();
return perf.createCSV();
}
}
public static class WordCountDebugCounter extends UserTaskInvokable {
int statCounter = 0;
int[] atNumOfRecords = new int[recordsEmitted / statPerRecords + 1];
long[] times = new long[atNumOfRecords.length];
private PerformanceTracker perf = new PerformanceTracker(1000, 10000);
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private String word = "";
private Integer count = 0;
private int i = 0;
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Integer>());
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
i++;
if (i % statPerRecordsAfterSplit == 0) {
atNumOfRecords[statCounter] = i;
times[statCounter] = System.currentTimeMillis();
statCounter++;
}
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
......@@ -158,42 +117,26 @@ public class WordCountRemote {
outRecord.setInteger(1, count);
emit(outRecord);
perf.count();
}
@Override
public String getResult() {
StringBuilder result = new StringBuilder("");
result.append("Counter result:\n");
for (int i = 0; i < atNumOfRecords.length; i++) {
result.append(atNumOfRecords[i] + ";" + times[i] + ";\n");
}
return result.toString();
return perf.createCSV();
}
}
public static class WordCountDebugSink extends UserSinkInvokable {
int nrOfRecords = 0;
int[] atNumOfRecords = new int[recordsEmitted / statPerRecords + 1];
long[] times = new long[atNumOfRecords.length];
private PerformanceTracker perf = new PerformanceTracker(1000, 10000);
@Override
public void invoke(StreamRecord record) throws Exception {
if (nrOfRecords % statPerRecordsAfterSplit == 0) {
atNumOfRecords[nrOfRecords] = nrOfRecords;
times[nrOfRecords] = System.currentTimeMillis();
nrOfRecords++;
}
perf.count();
}
@Override
public String getResult() {
StringBuilder result = new StringBuilder("");
result.append("Sink result:\n");
for (int i = 0; i < atNumOfRecords.length; i++) {
result.append(atNumOfRecords[i] + ";" + times[i] + ";\n");
}
result.append("RESULT: " + nrOfRecords);
return result.toString();
return perf.createCSV();
}
}
......@@ -202,7 +145,7 @@ public class WordCountRemote {
graphBuilder.setSource("WordCountSource", WordCountDebugSource.class, 2, 1);
graphBuilder.setTask("WordCountSplitter", WordCountDebugSplitter.class, 2, 1);
graphBuilder.setTask("WordCountCounter", WordCountDebugCounter.class, 2, 1);
graphBuilder.setSink("WordCountSink", WordCountDebugSink.class);
graphBuilder.setSink("WordCountSink", WordCountDebugSink.class,2,1);
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0);
......@@ -223,7 +166,8 @@ public class WordCountRemote {
jG.addJar(new Path(file.getAbsolutePath()));
Configuration configuration = jG.getJobConfiguration();
Client client = new Client(new InetSocketAddress("hadoop00.ilab.sztaki.hu", 6123), configuration);
Client client = new Client(new InetSocketAddress("hadoop00.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
} catch (Exception e) {
System.out.println(e);
......
package eu.stratosphere.streaming.util;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
public class PerformanceTracker {
List<Long> timeStamps;
List<Long> values;
List<String> labels;
long counter;
long countInterval;
long counts;
public PerformanceTracker() {
timeStamps = new ArrayList<Long>();
values = new ArrayList<Long>();
labels = new ArrayList<String>();
this.countInterval = 1;
counter = 0;
}
public PerformanceTracker(int counterLength, int countInterval) {
timeStamps = new ArrayList<Long>(counterLength);
values = new ArrayList<Long>(counterLength);
labels = new ArrayList<String>(counterLength);
this.countInterval = countInterval;
counter = 0;
}
public void track(Long value, String label) {
timeStamps.add(System.currentTimeMillis());
values.add(value);
labels.add(label);
}
public void track(Long value) {
track(value, "");
}
public void track(int value, String label) {
track(Long.valueOf(value), label);
}
public void track(int value) {
track(Long.valueOf(value), "");
}
public void track() {
track(1);
}
public void count(long i, String label) {
counter = counter + i;
counts++;
if (counts % countInterval == 0) {
counts = 0;
timeStamps.add(System.currentTimeMillis());
values.add(counter);
labels.add(label);
}
}
public void count(long i) {
count(i, "");
}
public void count(String label) {
count(1, label);
}
public void count() {
count(1, "");
}
public String createCSV() {
StringBuilder csv = new StringBuilder();
csv.append("Time,Value,Label\n");
for (int i = 0; i < timeStamps.size(); i++) {
csv.append(timeStamps.get(i) + "," + values.get(i) + "," + labels.get(i) + "\n");
}
return csv.toString();
}
public void writeCSV(String file) {
try {
PrintWriter out = new PrintWriter(file);
out.print(createCSV());
out.close();
} catch (FileNotFoundException e) {
System.out.println("CSV output file not found");
}
}
}
package eu.stratosphere.streaming.util;
import static org.junit.Assert.*;
import org.junit.Test;
public class PerformanceTrackerTest {
@Test
public void testPerformanceTracker() {
// fail("Not yet implemented");
}
@Test
public void testTrackLong() {
// fail("Not yet implemented");
}
@Test
public void testTrack() {
PerformanceTracker pT = new PerformanceTracker();
pT.track();
pT.track(3);
assertEquals(2, pT.timeStamps.size());
assertEquals(2, pT.values.size());
assertEquals(Long.valueOf(1), pT.values.get(0));
assertEquals(Long.valueOf(3), pT.values.get(1));
System.out.println(pT.createCSV());
System.out.println("--------------");
}
@Test
public void testCount() {
PerformanceTracker pT = new PerformanceTracker();
pT.count();
pT.count(10);
pT.count();
assertEquals(3, pT.timeStamps.size());
assertEquals(3, pT.values.size());
assertEquals(Long.valueOf(1), pT.values.get(0));
assertEquals(Long.valueOf(11), pT.values.get(1));
assertEquals(Long.valueOf(12), pT.values.get(2));
System.out.println(pT.createCSV());
System.out.println("--------------");
PerformanceTracker pT2 = new PerformanceTracker(1000,10000);
for(int i=0;i<10000000;i++){
pT2.count("test");
}
assertEquals(1000, pT2.timeStamps.size());
//pT2.writeCSV("C:/temp/test.csv");
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册