提交 87c11cfd 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] Making a copy of Record in StreamRecord

上级 e34bcd2c
......@@ -17,15 +17,14 @@ public final class StreamRecord {
}
public StreamRecord(Record record, String channelID) {
this(record);
record.addField(uid);
this.record = record.createCopy();
this.channelID = channelID;
}
public StreamRecord setId() {
public StreamRecord addId() {
Random rnd = new Random();
uid.setValue(channelID + "-" + rnd.nextInt(1000));
record.setField(record.getNumFields() - 1, uid);
record.addField(uid);
return this;
}
......
......@@ -21,10 +21,10 @@ public abstract class StreamInvokable {
this.emittedRecords = emittedRecords;
}
public final void emit(Record record) {
StreamRecord streamRecord = new StreamRecord(record, channelID).setId();
public final void emit(Record record) {
for (RecordWriter<Record> output : outputs) {
try {
StreamRecord streamRecord = new StreamRecord(record, channelID).addId();
output.emit(streamRecord.getRecord());
emittedRecords.put(streamRecord.getId(), streamRecord);
......
......@@ -29,6 +29,7 @@ public class WordCountCounter extends UserTaskInvokable {
private StringValue wordValue = new StringValue("");
private IntValue countValue = new IntValue(1);
private String word = "";
private Record outputRecord = new Record(wordValue, countValue);
private int count = 1;
@Override
......@@ -41,7 +42,8 @@ public class WordCountCounter extends UserTaskInvokable {
count = wordCounts.get(word) + 1;
wordCounts.put(word, count);
countValue.setValue(count);
Record outputRecord = new Record(wordValue, countValue);
outputRecord.setField(0, wordValue);
outputRecord.setField(1, countValue);
emit(outputRecord);
} else {
......
......@@ -24,14 +24,15 @@ public class WordCountSplitter extends UserTaskInvokable {
private StringValue sentence = new StringValue("");
private String[] words = new String[0];
private StringValue wordValue = new StringValue("");
private Record outputRecord = new Record(wordValue);
@Override
public void invoke(Record record) throws Exception {
record.getFieldInto(0, sentence);
words = sentence.getValue().split(" ");
for (CharSequence word : words) {
wordValue.setValue(word);
Record outputRecord = new Record(wordValue);
outputRecord.setField(0, wordValue);
emit(outputRecord);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册