提交 687ff805 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] ack and fail event updated + WordCountRemote modified for debugging

上级 84e67e7c
......@@ -46,14 +46,14 @@ public abstract class StreamInvokableComponent {
public final void emit(StreamRecord record) {
record.setId(channelID);
emittedRecords.addRecord(record);
//emittedRecords.addRecord(record);
try {
for (RecordWriter<StreamRecord> output : outputs) {
output.emit(record);
log.info("EMITTED: " + record.getId() + " -- " + name);
}
} catch (Exception e) {
emittedRecords.failRecord(record.getId());
//emittedRecords.failRecord(record.getId());
log.warn("FAILED: " + record.getId() + " -- " + name + " -- due to "
+ e.getClass().getSimpleName());
}
......
......@@ -17,25 +17,140 @@ package eu.stratosphere.streaming.examples.wordcount;
import java.io.File;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Level;
import eu.stratosphere.api.java.tuple.Tuple1;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.client.program.Client;
import eu.stratosphere.client.program.JobWithJars;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.streaming.api.JobGraphBuilder;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
import eu.stratosphere.streaming.util.LogUtils;
public class WordCountRemote {
public static class WordCountDebugSource extends UserSourceInvokable {
StreamRecord record = new StreamRecord(new Tuple1<String>());
@Override
public void invoke() throws Exception {
long time = System.currentTimeMillis();
for (int i = 1; i <= 100000; i++) {
if (i % 10000 == 0) {
System.out.println("Source: " + i + "\t time: "
+ (System.currentTimeMillis() - time) + " ms");
time = System.currentTimeMillis();
}
if (i % 2 == 0) {
record.setString(0, "Gyula Marci switched");
} else {
record.setString(0, "Gabor Frank to FINISHED");
}
emit(record);
}
}
}
public static class WordCountDebugSplitter extends UserTaskInvokable {
private String[] words = new String[] {};
private int i = 0;
private StreamRecord outputRecord = new StreamRecord(new Tuple1<String>());
long time = System.currentTimeMillis();
@Override
public void invoke(StreamRecord record) throws Exception {
i++;
if (i % 10000 == 0) {
System.out.println("Splitter: " + i + "\t time: "
+ (System.currentTimeMillis() - time) + " ms");
time = System.currentTimeMillis();
}
words = record.getString(0).split(" ");
for (String word : words) {
outputRecord.setString(0, word);
emit(outputRecord);
}
}
}
public static class WordCountDebugCounter extends UserTaskInvokable {
private Map<String, Integer> wordCounts = new HashMap<String, Integer>();
private String word = "";
private Integer count = 0;
private int i = 0;
private long time = System.currentTimeMillis();
private StreamRecord outRecord = new StreamRecord(new Tuple2<String, Integer>());
@Override
public void invoke(StreamRecord record) throws Exception {
word = record.getString(0);
i++;
if (i % 30000 == 0) {
System.out.println("Counter: " + i + "\t time: "
+ (System.currentTimeMillis() - time) + " ms");
time = System.currentTimeMillis();
}
if (wordCounts.containsKey(word)) {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
} else {
count = 1;
wordCounts.put(word, 1);
}
outRecord.setString(0, word);
outRecord.setInteger(1, count);
emit(outRecord);
}
}
public static class WordCountDebugSink extends UserSinkInvokable {
int nrOfRecords = 0;
private long time = System.currentTimeMillis();
@Override
public void invoke(StreamRecord record) throws Exception {
nrOfRecords++;
if (nrOfRecords % 30000 == 0) {
System.out.println("Sink: " + nrOfRecords + "\t time: "
+ (System.currentTimeMillis() - time) + " ms");
time = System.currentTimeMillis();
}
}
@Override
public String getResult() {
return String.valueOf(nrOfRecords);
}
}
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountDummySource2.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2, 1);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.setSource("WordCountSource", WordCountDebugSource.class, 2, 1);
graphBuilder.setTask("WordCountSplitter", WordCountDebugSplitter.class, 2, 1);
graphBuilder.setTask("WordCountCounter", WordCountDebugCounter.class, 2, 1);
graphBuilder.setSink("WordCountSink", WordCountDebugSink.class);
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0);
......@@ -45,7 +160,7 @@ public class WordCountRemote {
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.ERROR, Level.INFO);
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
try {
File file = new File("target/stratosphere-streaming-0.5-SNAPSHOT.jar");
......
......@@ -40,6 +40,10 @@ public class AckEvent extends AbstractTaskEvent {
setRecordId(recordId);
}
public AckEvent() {
this.recordId = "";
}
@Override
public void write(DataOutput out) throws IOException {
StringValue recordIdValue = new StringValue(recordId);
......
......@@ -41,6 +41,10 @@ public class FailEvent extends AbstractTaskEvent {
setRecordId(recordId);
}
public FailEvent() {
this.recordId = "";
}
@Override
public void write(DataOutput out) throws IOException {
StringValue recordIdValue = new StringValue(recordId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册