提交 d5f2d7fa 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] StreamRecord created

上级 cf867f51
package eu.stratosphere.streaming.api;
import java.util.Random;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.Value;
//TODO: refactor access modifiers
public class StreamRecord {
private Record record;
public StreamRecord(Record record) {
this.record = record;
}
public StreamRecord addId() {
Random rand = new Random();
record.addField(new LongValue(rand.nextLong()));
return this;
}
public Long popId() {
LongValue id = new LongValue();
record.getFieldInto(record.getNumFields() - 1, id);
record.removeField(record.getNumFields() - 1);
return id.getValue();
}
public Record getRecord() {
return record;
}
}
package eu.stratosphere.streaming.api;
import java.util.UUID;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
public final class StreamRecordProvider {
public static Long addUUID(Record rec) {
UUID uuid = UUID.randomUUID();
rec.addField(new LongValue(uuid.getMostSignificantBits()));
return uuid.getLeastSignificantBits();
}
public static Long popUUID(Record rec) {
LongValue mostSignificantBits = new LongValue();
rec.getFieldInto(rec.getNumFields() - 1, mostSignificantBits);
rec.removeField(rec.getNumFields() - 1);
return mostSignificantBits.getValue();
}
}
......@@ -127,12 +127,11 @@ public class StreamTask extends AbstractTask {
for (RecordReader<Record> input : inputs) {
if (input.hasNext()) {
hasInput = true;
Record record = input.next();
Long id = StreamRecordProvider.popUUID(record);
StreamRecord streamRecord = new StreamRecord(input.next());
Long id = streamRecord.popId();
userFunction.invoke(record);
//TODO: ack here
userFunction.invoke(streamRecord.getRecord());
//TODO: ack here
}
}
}
......
......@@ -3,7 +3,7 @@ package eu.stratosphere.streaming.api.invokable;
import java.util.List;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.StreamRecordProvider;
import eu.stratosphere.streaming.api.StreamRecord;
import eu.stratosphere.types.Record;
public abstract class StreamInvokable {
......@@ -17,9 +17,8 @@ public abstract class StreamInvokable {
public final void emit(Record record) {
for (RecordWriter<Record> output : outputs) {
try {
//TODO: use this id for acking
Long uuid = StreamRecordProvider.addUUID(record);
output.emit(record);
StreamRecord streamRecord = new StreamRecord(record).addId();
output.emit(record);
} catch (Exception e) {
System.out.println("Emit error");
}
......
......@@ -26,16 +26,16 @@ public class WordCount extends TestBase2 {
@Override
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WorCountSource", WordCountSource.class);
graphBuilder.setTask("WorCountSplitter", WordCountSplitter.class, 2);
graphBuilder.setTask("WorCountCounter", WordCountCounter.class, 2);
graphBuilder.setSource("WordCountSource", WordCountSource.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.fieldsConnect("WorCountSource", "WorCountSplitter", 0,
graphBuilder.fieldsConnect("WordCountSource", "WordCountSplitter", 0,
StringValue.class);
graphBuilder.fieldsConnect("WorCountSplitter", "WorCountCounter", 0,
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0,
StringValue.class);
graphBuilder.broadcastConnect("WorCountCounter", "WordCountSink");
graphBuilder.broadcastConnect("WordCountCounter", "WordCountSink");
return graphBuilder.getJobGraph();
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册