提交 68ebfc48 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] WordCount performance testing

上级 413227c9
......@@ -80,7 +80,7 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
numberOfOutputChannels[i]=taskConfiguration.getInteger("channels_"+i, 0);
}
recordBuffer = new FaultToleranceUtil(outputs, sourceInstanceID, numberOfOutputChannels);
recordBuffer = new FaultToleranceUtil(outputs, sourceInstanceID,name, numberOfOutputChannels);
userFunction = (UserSourceInvokable) streamSourceHelper.getUserFunction(
taskConfiguration, outputs, sourceInstanceID, name, recordBuffer);
streamSourceHelper.setAckListener(recordBuffer, sourceInstanceID, outputs);
......
......@@ -68,19 +68,19 @@ public class StreamTask extends AbstractTask {
log.error("Cannot register inputs/outputs for " + getClass().getSimpleName(), e);
}
int[] numberOfOutputChannels= new int[outputs.size()];
for(int i=0; i<numberOfOutputChannels.length;i++ ){
numberOfOutputChannels[i]=taskConfiguration.getInteger("channels_"+i, 0);
int[] numberOfOutputChannels = new int[outputs.size()];
for (int i = 0; i < numberOfOutputChannels.length; i++) {
numberOfOutputChannels[i] = taskConfiguration.getInteger("channels_" + i, 0);
}
recordBuffer = new FaultToleranceUtil(outputs, taskInstanceID, numberOfOutputChannels);
userFunction = (UserTaskInvokable) streamTaskHelper.getUserFunction(taskConfiguration, outputs, taskInstanceID, name,
recordBuffer);
recordBuffer = new FaultToleranceUtil(outputs, taskInstanceID, name, numberOfOutputChannels);
userFunction = (UserTaskInvokable) streamTaskHelper.getUserFunction(taskConfiguration,
outputs, taskInstanceID, name, recordBuffer);
streamTaskHelper.setAckListener(recordBuffer, taskInstanceID, outputs);
streamTaskHelper.setFailListener(recordBuffer, taskInstanceID, outputs);
}
@Override
public void invoke() throws Exception {
log.debug("TASK " + name + " invoked with instance id " + taskInstanceID);
......
......@@ -21,8 +21,6 @@ import java.util.Map;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.PerformanceCounter;
import eu.stratosphere.streaming.util.PerformanceTimer;
public class WordCountCounter extends UserTaskInvokable {
......@@ -30,8 +28,6 @@ public class WordCountCounter extends UserTaskInvokable {
private String word = "";
private Integer count = 0;
PerformanceCounter pCounter = new PerformanceCounter("CounterEmitCounter", 1000, 1000, "");
PerformanceTimer pTimer = new PerformanceTimer("CounterEmitTimer", 1000, 1000, true, "");
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Integer>());
......@@ -50,17 +46,12 @@ public class WordCountCounter extends UserTaskInvokable {
outRecord.setString(0, word);
outRecord.setInteger(1, count);
pTimer.startTimer();
emit(outRecord);
pTimer.stopTimer();
pCounter.count();
}
@Override
public String getResult() {
pCounter.writeCSV("/home/strato/stratosphere-distrib/log/counter/Counter" + channelID);
pTimer.writeCSV("/home/strato/stratosphere-distrib/log/timer/Counter" + channelID);
return "";
}
}
......@@ -17,20 +17,16 @@ package eu.stratosphere.streaming.examples.wordcount;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.PerformanceCounter;
public class WordCountSink extends UserSinkInvokable {
PerformanceCounter perf = new PerformanceCounter("SinkEmitCounter", 1000, 1000, "");
@Override
public void invoke(StreamRecord record) throws Exception {
perf.count();
}
@Override
public String getResult() {
perf.writeCSV("/home/strato/stratosphere-distrib/log/counter/Sink");
return "";
}
......
......@@ -22,8 +22,6 @@ import java.io.FileReader;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.PerformanceCounter;
import eu.stratosphere.streaming.util.PerformanceTimer;
public class WordCountSourceSplitter extends UserSourceInvokable {
......@@ -31,13 +29,11 @@ public class WordCountSourceSplitter extends UserSourceInvokable {
private String line = new String();
private StreamRecord outRecord = new StreamRecord(new Tuple1<String>());
PerformanceCounter pCounter = new PerformanceCounter("SourceEmitCounter", 1000, 1000, "");
PerformanceTimer pTimer = new PerformanceTimer("SourceEmitTimer", 1000, 1000, true, "");
@Override
public void invoke() throws Exception {
for (int i = 0; i < 10; i++) {
while (true) {
try {
br = new BufferedReader(new FileReader("/home/strato/stratosphere-distrib/resources/hamlet.txt"));
......@@ -46,10 +42,7 @@ public class WordCountSourceSplitter extends UserSourceInvokable {
if (line != "") {
for (String word : line.split(" ")) {
outRecord.setString(0, word);
pTimer.startTimer();
emit(outRecord);
pTimer.stopTimer();
pCounter.count();
}
}
line = br.readLine();
......@@ -58,20 +51,16 @@ public class WordCountSourceSplitter extends UserSourceInvokable {
} catch (FileNotFoundException e) {
e.printStackTrace();
}
// Thread.sleep(1);
}
Thread.sleep(15000);
emit(outRecord);
Thread.sleep(15000);
emit(outRecord);
}
@Override
public String getResult() {
pCounter.writeCSV("/home/strato/stratosphere-distrib/log/counter/Source" + channelID);
pTimer.writeCSV("/home/strato/stratosphere-distrib/log/timer/Source" + channelID);
return "";
}
}
\ No newline at end of file
......@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.api.streamrecord.UID;
import eu.stratosphere.streaming.util.PerformanceCounter;
import eu.stratosphere.streaming.util.PerformanceTracker;
/**
......@@ -42,7 +43,8 @@ public class FaultToleranceUtil {
boolean exactlyOnce;
private FaultToleranceBuffer buffer;
public PerformanceTracker counter;
public PerformanceTracker tracker;
public PerformanceCounter counter;
/**
* Creates fault tolerance buffer object for the given output channels and
......@@ -71,10 +73,35 @@ public class FaultToleranceUtil {
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
}
counter = new PerformanceTracker("pc", 1000, 1000,14900, "/home/strato/stratosphere-distrib/log/counter/Buffer" + sourceInstanceID);
tracker = new PerformanceTracker("pc", 1000, 1000, 30000,
"/home/strato/stratosphere-distrib/log/counter/Buffer" + sourceInstanceID);
counter = new PerformanceCounter("pc", 1000, 1000, 30000,
"/home/strato/stratosphere-distrib/log/counter/Emitted" + sourceInstanceID);
}
public FaultToleranceUtil(List<RecordWriter<StreamRecord>> outputs, int sourceInstanceID,
String componentName, int[] numberOfChannels) {
this.outputs = outputs;
this.componentID = sourceInstanceID;
exactlyOnce = false;
if (exactlyOnce) {
this.buffer = new ExactlyOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
} else {
this.buffer = new AtLeastOnceFaultToleranceBuffer(numberOfChannels, sourceInstanceID);
}
tracker = new PerformanceTracker("pc", 1000, 1000, 10000,
"/home/strato/stratosphere-distrib/log/counter/Buffer" + componentName
+ sourceInstanceID);
counter = new PerformanceCounter("pc", 1000, 1000, 10000,
"/home/strato/stratosphere-distrib/log/counter/Emitted" + componentName
+ sourceInstanceID);
}
/**
* Adds the record to the fault tolerance buffer. This record will be
* monitored for acknowledgements and timeout.
......@@ -85,7 +112,8 @@ public class FaultToleranceUtil {
public void addRecord(StreamRecord streamRecord) {
buffer.add(streamRecord);
counter.track(this.buffer.recordBuffer.size());
tracker.track(this.buffer.recordBuffer.size());
counter.count();
}
public void addRecord(StreamRecord streamRecord, int output) {
......@@ -107,7 +135,7 @@ public class FaultToleranceUtil {
// TODO: find a place to call timeoutRecords
public void ackRecord(UID recordID, int channel) {
buffer.ack(recordID, channel);
tracker.track(this.buffer.recordBuffer.size());
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册