提交 22fa55b0 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] Implemented getResult() for performance stats in WordCountRemote

上级 687ff805
...@@ -16,5 +16,4 @@ ...@@ -16,5 +16,4 @@
package eu.stratosphere.streaming.api.invokable; package eu.stratosphere.streaming.api.invokable;
public interface UserInvokable { public interface UserInvokable {
} }
...@@ -16,8 +16,7 @@ ...@@ -16,8 +16,7 @@
package eu.stratosphere.streaming.api.invokable; package eu.stratosphere.streaming.api.invokable;
public abstract class UserSinkInvokable implements RecordInvokable { public abstract class UserSinkInvokable implements RecordInvokable {
// TODO consider a common interface with StreamInvokableComponents
//TODO: consider moving this to an interface
public String getResult() { public String getResult() {
return "Override getResult() to pass your own results"; return "Override getResult() to pass your own results";
} }
......
...@@ -33,7 +33,6 @@ import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable; ...@@ -33,7 +33,6 @@ import eu.stratosphere.streaming.api.invokable.DefaultSinkInvokable;
import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable; import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.RecordInvokable; import eu.stratosphere.streaming.api.invokable.RecordInvokable;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.streamrecord.TupleSizeMismatchException;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.AckEvent; import eu.stratosphere.streaming.faulttolerance.AckEvent;
import eu.stratosphere.streaming.faulttolerance.AckEventListener; import eu.stratosphere.streaming.faulttolerance.AckEventListener;
......
...@@ -22,13 +22,11 @@ import org.apache.commons.logging.LogFactory; ...@@ -22,13 +22,11 @@ import org.apache.commons.logging.LogFactory;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord; import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.faulttolerance.FailEvent;
import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil; import eu.stratosphere.streaming.faulttolerance.FaultToleranceUtil;
public abstract class StreamInvokableComponent { public abstract class StreamInvokableComponent {
private static final Log log = LogFactory private static final Log log = LogFactory.getLog(StreamInvokableComponent.class);
.getLog(StreamInvokableComponent.class);
private List<RecordWriter<StreamRecord>> outputs; private List<RecordWriter<StreamRecord>> outputs;
...@@ -36,8 +34,8 @@ public abstract class StreamInvokableComponent { ...@@ -36,8 +34,8 @@ public abstract class StreamInvokableComponent {
protected String name; protected String name;
private FaultToleranceUtil emittedRecords; private FaultToleranceUtil emittedRecords;
public final void declareOutputs(List<RecordWriter<StreamRecord>> outputs, public final void declareOutputs(List<RecordWriter<StreamRecord>> outputs, String channelID, String name,
String channelID, String name, FaultToleranceUtil emittedRecords) { FaultToleranceUtil emittedRecords) {
this.outputs = outputs; this.outputs = outputs;
this.channelID = channelID; this.channelID = channelID;
this.emittedRecords = emittedRecords; this.emittedRecords = emittedRecords;
...@@ -46,27 +44,30 @@ public abstract class StreamInvokableComponent { ...@@ -46,27 +44,30 @@ public abstract class StreamInvokableComponent {
public final void emit(StreamRecord record) { public final void emit(StreamRecord record) {
record.setId(channelID); record.setId(channelID);
//emittedRecords.addRecord(record); // emittedRecords.addRecord(record);
try { try {
for (RecordWriter<StreamRecord> output : outputs) { for (RecordWriter<StreamRecord> output : outputs) {
output.emit(record); output.emit(record);
log.info("EMITTED: " + record.getId() + " -- " + name); log.info("EMITTED: " + record.getId() + " -- " + name);
} }
} catch (Exception e) { } catch (Exception e) {
//emittedRecords.failRecord(record.getId()); // emittedRecords.failRecord(record.getId());
log.warn("FAILED: " + record.getId() + " -- " + name + " -- due to " log.warn("FAILED: " + record.getId() + " -- " + name + " -- due to " + e.getClass().getSimpleName());
+ e.getClass().getSimpleName());
} }
} }
//TODO: Add fault tolerance // TODO: Add fault tolerance
public final void emit(StreamRecord record, int outputChannel) { public final void emit(StreamRecord record, int outputChannel) {
record.setId(channelID); record.setId(channelID);
emittedRecords.addRecord(record,outputChannel); emittedRecords.addRecord(record, outputChannel);
try { try {
outputs.get(outputChannel).emit(record); outputs.get(outputChannel).emit(record);
} catch (Exception e) { } catch (Exception e) {
log.warn("EMIT ERROR: " + e.getClass().getSimpleName() + " -- " + name); log.warn("EMIT ERROR: " + e.getClass().getSimpleName() + " -- " + name);
} }
} }
public String getResult() {
return "Override getResult() to pass your own results";
}
} }
...@@ -91,6 +91,8 @@ public class StreamSource extends AbstractInputTask<DummyIS> { ...@@ -91,6 +91,8 @@ public class StreamSource extends AbstractInputTask<DummyIS> {
public void invoke() throws Exception { public void invoke() throws Exception {
log.debug("SOURCE " + name + " invoked with instance id " + sourceInstanceID); log.debug("SOURCE " + name + " invoked with instance id " + sourceInstanceID);
userFunction.invoke(); userFunction.invoke();
// TODO print to file
System.out.println(userFunction.getResult());
} }
} }
...@@ -86,6 +86,8 @@ public class StreamTask extends AbstractTask { ...@@ -86,6 +86,8 @@ public class StreamTask extends AbstractTask {
public void invoke() throws Exception { public void invoke() throws Exception {
log.debug("TASK " + name + " invoked with instance id " + taskInstanceID); log.debug("TASK " + name + " invoked with instance id " + taskInstanceID);
streamTaskHelper.invokeRecords(userFunction, inputs, name); streamTaskHelper.invokeRecords(userFunction, inputs, name);
// TODO print to file
System.out.println(userFunction.getResult());
log.debug("TASK " + name + " invoke finished with instance id " + taskInstanceID); log.debug("TASK " + name + " invoke finished with instance id " + taskInstanceID);
} }
......
...@@ -18,6 +18,8 @@ package eu.stratosphere.streaming.examples.wordcount; ...@@ -18,6 +18,8 @@ package eu.stratosphere.streaming.examples.wordcount;
import java.io.File; import java.io.File;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.log4j.Level; import org.apache.log4j.Level;
...@@ -37,21 +39,29 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord; ...@@ -37,21 +39,29 @@ import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.LogUtils; import eu.stratosphere.streaming.util.LogUtils;
public class WordCountRemote { public class WordCountRemote {
private final static int recordsEmitted = 100000;
private final static int statPerRecords = 10000;
private final static int recordsEmittedFromSplitter = 350000; // recordsEmitted * wordsEmittedInARecord
private final static int statPerRecordsAfterSplit = recordsEmittedFromSplitter / recordsEmitted * statPerRecords;
public static class WordCountDebugSource extends UserSourceInvokable { public static class WordCountDebugSource extends UserSourceInvokable {
int statCounter = 0;
int[] atNumOfRecords = new int[recordsEmitted / statPerRecords + 1];
long[] times = new long[recordsEmitted / statPerRecords + 1];
StreamRecord record = new StreamRecord(new Tuple1<String>()); StreamRecord record = new StreamRecord(new Tuple1<String>());
@Override @Override
public void invoke() throws Exception { public void invoke() throws Exception {
long time = System.currentTimeMillis(); atNumOfRecords[statCounter] = 0;
times[statCounter] = System.currentTimeMillis();
for (int i = 1; i <= 100000; i++) { statCounter++;
if (i % 10000 == 0) { for (int i = 1; i <= recordsEmitted; i++) {
System.out.println("Source: " + i + "\t time: " if (i % statPerRecords == 0) {
+ (System.currentTimeMillis() - time) + " ms"); atNumOfRecords[statCounter] = i;
time = System.currentTimeMillis(); times[statCounter] = System.currentTimeMillis();
statCounter++;
} }
if (i % 2 == 0) { if (i % 2 == 0) {
...@@ -62,22 +72,36 @@ public class WordCountRemote { ...@@ -62,22 +72,36 @@ public class WordCountRemote {
emit(record); emit(record);
} }
} }
@Override
public String getResult() {
StringBuilder result = new StringBuilder("");
result.append("Source result:\n");
for (int i = 0; i < atNumOfRecords.length; i++) {
result.append(atNumOfRecords[i] + ";" + times[i] + ";\n");
}
return result.toString();
}
} }
public static class WordCountDebugSplitter extends UserTaskInvokable { public static class WordCountDebugSplitter extends UserTaskInvokable {
int statCounter = 0;
int[] atNumOfRecords = new int[recordsEmitted / statPerRecords + 1];
long[] times = new long[atNumOfRecords.length];
private String[] words = new String[] {}; private String[] words = new String[] {};
private int i = 0; private int i = 0;
private StreamRecord outputRecord = new StreamRecord(new Tuple1<String>()); private StreamRecord outputRecord = new StreamRecord(new Tuple1<String>());
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
@Override @Override
public void invoke(StreamRecord record) throws Exception { public void invoke(StreamRecord record) throws Exception {
i++; i++;
if (i % 10000 == 0) { if (i % statPerRecords == 0) {
System.out.println("Splitter: " + i + "\t time: " atNumOfRecords[statCounter] = i;
+ (System.currentTimeMillis() - time) + " ms"); times[statCounter] = System.currentTimeMillis();
time = System.currentTimeMillis(); statCounter++;
} }
words = record.getString(0).split(" "); words = record.getString(0).split(" ");
...@@ -86,16 +110,28 @@ public class WordCountRemote { ...@@ -86,16 +110,28 @@ public class WordCountRemote {
emit(outputRecord); emit(outputRecord);
} }
} }
@Override
public String getResult() {
StringBuilder result = new StringBuilder("");
result.append("Splitter result:\n");
for (int i = 0; i < atNumOfRecords.length; i++) {
result.append(atNumOfRecords[i] + ";" + times[i] + ";\n");
}
return result.toString();
}
} }
public static class WordCountDebugCounter extends UserTaskInvokable { public static class WordCountDebugCounter extends UserTaskInvokable {
int statCounter = 0;
int[] atNumOfRecords = new int[recordsEmitted / statPerRecords + 1];
long[] times = new long[atNumOfRecords.length];
private Map<String, Integer> wordCounts = new HashMap<String, Integer>(); private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private String word = ""; private String word = "";
private Integer count = 0; private Integer count = 0;
private int i = 0; private int i = 0;
private long time = System.currentTimeMillis();
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Integer>()); private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Integer>());
...@@ -103,11 +139,13 @@ public class WordCountRemote { ...@@ -103,11 +139,13 @@ public class WordCountRemote {
public void invoke(StreamRecord record) throws Exception { public void invoke(StreamRecord record) throws Exception {
word = record.getString(0); word = record.getString(0);
i++; i++;
if (i % 30000 == 0) {
System.out.println("Counter: " + i + "\t time: " if (i % statPerRecordsAfterSplit == 0) {
+ (System.currentTimeMillis() - time) + " ms"); atNumOfRecords[statCounter] = i;
time = System.currentTimeMillis(); times[statCounter] = System.currentTimeMillis();
statCounter++;
} }
if (wordCounts.containsKey(word)) { if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1; count = wordCounts.get(word) + 1;
wordCounts.put(word, count); wordCounts.put(word, count);
...@@ -121,28 +159,42 @@ public class WordCountRemote { ...@@ -121,28 +159,42 @@ public class WordCountRemote {
emit(outRecord); emit(outRecord);
} }
@Override
public String getResult() {
StringBuilder result = new StringBuilder("");
result.append("Counter result:\n");
for (int i = 0; i < atNumOfRecords.length; i++) {
result.append(atNumOfRecords[i] + ";" + times[i] + ";\n");
}
return result.toString();
}
} }
public static class WordCountDebugSink extends UserSinkInvokable { public static class WordCountDebugSink extends UserSinkInvokable {
int nrOfRecords = 0; int nrOfRecords = 0;
private long time = System.currentTimeMillis(); int[] atNumOfRecords = new int[recordsEmitted / statPerRecords + 1];
long[] times = new long[atNumOfRecords.length];
@Override @Override
public void invoke(StreamRecord record) throws Exception { public void invoke(StreamRecord record) throws Exception {
nrOfRecords++; if (nrOfRecords % statPerRecordsAfterSplit == 0) {
if (nrOfRecords % 30000 == 0) { atNumOfRecords[nrOfRecords] = nrOfRecords;
System.out.println("Sink: " + nrOfRecords + "\t time: " times[nrOfRecords] = System.currentTimeMillis();
+ (System.currentTimeMillis() - time) + " ms"); nrOfRecords++;
time = System.currentTimeMillis();
} }
} }
@Override @Override
public String getResult() { public String getResult() {
return String.valueOf(nrOfRecords); StringBuilder result = new StringBuilder("");
result.append("Sink result:\n");
for (int i = 0; i < atNumOfRecords.length; i++) {
result.append(atNumOfRecords[i] + ";" + times[i] + ";\n");
}
result.append("RESULT: " + nrOfRecords);
return result.toString();
} }
} }
private static JobGraph getJobGraph() throws Exception { private static JobGraph getJobGraph() throws Exception {
...@@ -171,8 +223,7 @@ public class WordCountRemote { ...@@ -171,8 +223,7 @@ public class WordCountRemote {
jG.addJar(new Path(file.getAbsolutePath())); jG.addJar(new Path(file.getAbsolutePath()));
Configuration configuration = jG.getJobConfiguration(); Configuration configuration = jG.getJobConfiguration();
Client client = new Client(new InetSocketAddress("hadoop00.ilab.sztaki.hu", 6123), Client client = new Client(new InetSocketAddress("hadoop00.ilab.sztaki.hu", 6123), configuration);
configuration);
client.run(jG, true); client.run(jG, true);
} catch (Exception e) { } catch (Exception e) {
System.out.println(e); System.out.println(e);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册