提交 93354e92 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] Started implementing FlatStreamTask

上级 486d7e0c
package eu.stratosphere.streaming.api;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.Value;
public class FlatStreamRecord {
private long id;
private Record record;
private int numberOfFields;
public FlatStreamRecord() {
id = 94;
}
public FlatStreamRecord(int numberOfFields) {
this();
this.numberOfFields = numberOfFields;
record = new Record(numberOfFields + 1);
LongValue idValue = new LongValue(id);
record.addField(idValue);
}
public FlatStreamRecord(Record record) {
this();
this.numberOfFields = record.getNumFields();
this.record = record;
LongValue idValue = new LongValue(id);
record.addField(idValue);
}
public int getNumFields() {
return this.numberOfFields;
}
public boolean getFieldInto(int fieldNum, Value target) {
return record.getFieldInto(fieldNum, target);
}
public <T extends Value> T getField(final int fieldNum, final Class<T> type) {
return record.getField(fieldNum, type);
}
//TODO: set to private or package private, this cannot be seen from outside!
public Record getRecord() {
return this.record;
}
}
...@@ -68,7 +68,7 @@ public class StreamSink extends AbstractOutputTask { ...@@ -68,7 +68,7 @@ public class StreamSink extends AbstractOutputTask {
for (RecordReader<Record> input : inputs) { for (RecordReader<Record> input : inputs) {
if (input.hasNext()) { if (input.hasNext()) {
hasInput = true; hasInput = true;
userFunction.invoke(input.next(), input); userFunction.invoke(input.next());
} }
} }
} }
......
...@@ -56,8 +56,7 @@ public class StreamSource extends AbstractInputTask<RandIS> { ...@@ -56,8 +56,7 @@ public class StreamSource extends AbstractInputTask<RandIS> {
private void setConfigInputs() { private void setConfigInputs() {
numberOfOutputs = getTaskConfiguration().getInteger("numberOfOutputs", numberOfOutputs = getTaskConfiguration().getInteger("numberOfOutputs", 0);
0);
Class<? extends ChannelSelector<Record>> partitioner; Class<? extends ChannelSelector<Record>> partitioner;
for (int i = 1; i <= numberOfOutputs; i++) { for (int i = 1; i <= numberOfOutputs; i++) {
...@@ -73,12 +72,18 @@ public class StreamSource extends AbstractInputTask<RandIS> { ...@@ -73,12 +72,18 @@ public class StreamSource extends AbstractInputTask<RandIS> {
} }
} }
for (ChannelSelector<Record> outputPartitioner : partitioners) {
outputs.add(new RecordWriter<Record>(this, Record.class,
outputPartitioner));
}
Class<? extends UserSourceInvokable> userFunctionClass; Class<? extends UserSourceInvokable> userFunctionClass;
userFunctionClass = getTaskConfiguration().getClass("userfunction", userFunctionClass = getTaskConfiguration().getClass("userfunction",
DefaultSourceInvokable.class, UserSourceInvokable.class); DefaultSourceInvokable.class, UserSourceInvokable.class);
try { try {
userFunction = userFunctionClass.newInstance(); userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs);
} catch (Exception e) { } catch (Exception e) {
} }
...@@ -88,18 +93,14 @@ public class StreamSource extends AbstractInputTask<RandIS> { ...@@ -88,18 +93,14 @@ public class StreamSource extends AbstractInputTask<RandIS> {
@Override @Override
public void registerInputOutput() { public void registerInputOutput() {
setConfigInputs(); setConfigInputs();
for (ChannelSelector<Record> partitioner : partitioners) {
outputs.add(new RecordWriter<Record>(this, Record.class,
partitioner));
}
} }
@Override @Override
public void invoke() throws Exception { public void invoke() throws Exception {
for (RecordWriter<Record> output : outputs) { userFunction.invoke();
userFunction.invoke(output);
}
} }
} }
...@@ -26,6 +26,7 @@ import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable; ...@@ -26,6 +26,7 @@ import eu.stratosphere.streaming.api.invokable.DefaultTaskInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.partitioner.DefaultPartitioner; import eu.stratosphere.streaming.partitioner.DefaultPartitioner;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class StreamTask extends AbstractTask { public class StreamTask extends AbstractTask {
...@@ -67,11 +68,17 @@ public class StreamTask extends AbstractTask { ...@@ -67,11 +68,17 @@ public class StreamTask extends AbstractTask {
} }
} }
for (ChannelSelector<Record> outputPartitioner : partitioners) {
outputs.add(new RecordWriter<Record>(this, Record.class,
outputPartitioner));
}
Class<? extends UserTaskInvokable> userFunctionClass; Class<? extends UserTaskInvokable> userFunctionClass;
userFunctionClass = getTaskConfiguration().getClass("userfunction", userFunctionClass = getTaskConfiguration().getClass("userfunction",
DefaultTaskInvokable.class, UserTaskInvokable.class); DefaultTaskInvokable.class, UserTaskInvokable.class);
try { try {
userFunction = userFunctionClass.newInstance(); userFunction = userFunctionClass.newInstance();
userFunction.declareOutputs(outputs);
} catch (Exception e) { } catch (Exception e) {
} }
...@@ -80,12 +87,10 @@ public class StreamTask extends AbstractTask { ...@@ -80,12 +87,10 @@ public class StreamTask extends AbstractTask {
@Override @Override
public void registerInputOutput() { public void registerInputOutput() {
setConfigInputs(); setConfigInputs();
for (ChannelSelector<Record> partitioner : partitioners) { Record r = new Record();
outputs.add(new RecordWriter<Record>(this, Record.class, partitioner)); r.addField(new StringValue(""));
}
} }
// TODO: Performance with multiple outputs
@Override @Override
public void invoke() throws Exception { public void invoke() throws Exception {
boolean hasInput = true; boolean hasInput = true;
...@@ -94,9 +99,7 @@ public class StreamTask extends AbstractTask { ...@@ -94,9 +99,7 @@ public class StreamTask extends AbstractTask {
for (RecordReader<Record> input : inputs) { for (RecordReader<Record> input : inputs) {
if (input.hasNext()) { if (input.hasNext()) {
hasInput = true; hasInput = true;
for (RecordWriter<Record> output : outputs) { userFunction.invoke(new FlatStreamRecord(input.next()));
userFunction.invoke(input.next(), output);
}
} }
} }
} }
......
...@@ -15,15 +15,13 @@ ...@@ -15,15 +15,13 @@
package eu.stratosphere.streaming.api.invokable; package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue; import eu.stratosphere.types.StringValue;
public class DefaultSinkInvokable implements UserSinkInvokable { public class DefaultSinkInvokable implements UserSinkInvokable {
@Override @Override
public void invoke(Record record, RecordReader<Record> input) public void invoke(Record record) throws Exception {
throws Exception {
StringValue value = new StringValue(""); StringValue value = new StringValue("");
record.getFieldInto(0, value); record.getFieldInto(0, value);
System.out.println(value.getValue()); System.out.println(value.getValue());
......
...@@ -15,19 +15,19 @@ ...@@ -15,19 +15,19 @@
package eu.stratosphere.streaming.api.invokable; package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.streaming.api.FlatStreamRecord;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue; import eu.stratosphere.types.StringValue;
public class DefaultSourceInvokable implements UserSourceInvokable { public class DefaultSourceInvokable extends UserSourceInvokable {
private String motto = "Stratosphere -- Big Data looks tiny from here"; private String motto = "Stratosphere -- Big Data looks tiny from here";
private String[] mottoArray = motto.split(" "); private String[] mottoArray = motto.split(" ");
@Override @Override
public void invoke(RecordWriter<Record> output) throws Exception { public void invoke() throws Exception {
for (CharSequence word : mottoArray) { for (CharSequence word : mottoArray) {
output.emit(new Record(new StringValue(word))); emit(new FlatStreamRecord(new Record(new StringValue(word))));
} }
} }
......
...@@ -15,15 +15,12 @@ ...@@ -15,15 +15,12 @@
package eu.stratosphere.streaming.api.invokable; package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.streaming.api.FlatStreamRecord;
import eu.stratosphere.types.Record;
public class DefaultTaskInvokable implements UserTaskInvokable { public class DefaultTaskInvokable extends UserTaskInvokable {
@Override @Override
public void invoke(Record record, RecordWriter<Record> output) public void invoke(FlatStreamRecord streamRecord) throws Exception {
throws Exception { emit(streamRecord);
output.emit(record);
} }
} }
\ No newline at end of file
package eu.stratosphere.streaming.api.invokable;
import java.util.List;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.FlatStreamRecord;
import eu.stratosphere.types.Record;
public abstract class StreamInvokable {
private List<RecordWriter<Record>> outputs;
public final void declareOutputs(List<RecordWriter<Record>> outputs) {
this.outputs = outputs;
}
public final void emit(FlatStreamRecord streamRecord) {
for (RecordWriter<Record> output : outputs) {
try {
output.emit(streamRecord.getRecord());
} catch (Exception e) {
System.out.println("Emit error");
}
}
}
}
...@@ -15,11 +15,9 @@ ...@@ -15,11 +15,9 @@
package eu.stratosphere.streaming.api.invokable; package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
public interface UserSinkInvokable { public interface UserSinkInvokable {
public void invoke(Record record, public void invoke(Record record) throws Exception;
RecordReader<Record> input) throws Exception;
} }
\ No newline at end of file
...@@ -15,9 +15,9 @@ ...@@ -15,9 +15,9 @@
package eu.stratosphere.streaming.api.invokable; package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.nephele.io.RecordWriter; public abstract class UserSourceInvokable extends StreamInvokable {
import eu.stratosphere.types.Record;
public interface UserSourceInvokable { public void invoke() throws Exception {
public void invoke(RecordWriter<Record> output) throws Exception ;
}
} }
...@@ -15,11 +15,11 @@ ...@@ -15,11 +15,11 @@
package eu.stratosphere.streaming.api.invokable; package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.streaming.api.FlatStreamRecord;
import eu.stratosphere.types.Record;
public interface UserTaskInvokable { public abstract class UserTaskInvokable extends StreamInvokable {
public void invoke(Record record, public void invoke(FlatStreamRecord record) throws Exception {
RecordWriter<Record> output) throws Exception;
}
} }
...@@ -15,16 +15,16 @@ ...@@ -15,16 +15,16 @@
package eu.stratosphere.streaming.test; package eu.stratosphere.streaming.test;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.streaming.api.FlatStreamRecord;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.IntValue; import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue; import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
public class QuerySourceInvokable implements UserSourceInvokable { public class QuerySourceInvokable extends UserSourceInvokable {
@Override @Override
public void invoke(RecordWriter<Record> output) throws Exception { public void invoke() throws Exception {
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
Record record1 = new Record(3); Record record1 = new Record(3);
record1.setField(0, new IntValue(5)); record1.setField(0, new IntValue(5));
...@@ -34,10 +34,10 @@ public class QuerySourceInvokable implements UserSourceInvokable { ...@@ -34,10 +34,10 @@ public class QuerySourceInvokable implements UserSourceInvokable {
Record record2 = new Record(3); Record record2 = new Record(3);
record2.setField(0, new IntValue(4)); record2.setField(0, new IntValue(4));
record2.setField(1, new LongValue(510)); record2.setField(1, new LongValue(510));
record1.setField(2, new LongValue(100)); record2.setField(2, new LongValue(100));
output.emit(record1); emit(new FlatStreamRecord(record1));
output.emit(record2); emit(new FlatStreamRecord(record2));
} }
} }
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
package eu.stratosphere.streaming.test; package eu.stratosphere.streaming.test;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable; import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue; import eu.stratosphere.types.StringValue;
...@@ -23,8 +22,7 @@ import eu.stratosphere.types.StringValue; ...@@ -23,8 +22,7 @@ import eu.stratosphere.types.StringValue;
public class TestSinkInvokable implements UserSinkInvokable { public class TestSinkInvokable implements UserSinkInvokable {
@Override @Override
public void invoke(Record record, RecordReader<Record> input) public void invoke(Record record) throws Exception {
throws Exception {
StringValue value = new StringValue(""); StringValue value = new StringValue("");
record.getFieldInto(0, value); record.getFieldInto(0, value);
......
...@@ -15,17 +15,16 @@ ...@@ -15,17 +15,16 @@
package eu.stratosphere.streaming.test; package eu.stratosphere.streaming.test;
import eu.stratosphere.streaming.api.FlatStreamRecord;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable; import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.types.IntValue; import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue; import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
public class TestSourceInvokable implements UserSourceInvokable { public class TestSourceInvokable extends UserSourceInvokable {
@Override @Override
public void invoke(RecordWriter<Record> output) throws Exception { public void invoke() throws Exception {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
Record record1 = new Record(2); Record record1 = new Record(2);
record1.setField(0, new IntValue(5)); record1.setField(0, new IntValue(5));
...@@ -33,8 +32,8 @@ public class TestSourceInvokable implements UserSourceInvokable { ...@@ -33,8 +32,8 @@ public class TestSourceInvokable implements UserSourceInvokable {
Record record2 = new Record(2); Record record2 = new Record(2);
record2.setField(0, new IntValue(4)); record2.setField(0, new IntValue(4));
record2.setField(1, new LongValue(500)); record2.setField(1, new LongValue(500));
output.emit(record1); emit(new FlatStreamRecord(record1));
output.emit(record2); emit(new FlatStreamRecord(record2));
} }
} }
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
package eu.stratosphere.streaming.test; package eu.stratosphere.streaming.test;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.streaming.api.FlatStreamRecord;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable; import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.test.cellinfo.WorkerEngineExact; import eu.stratosphere.streaming.test.cellinfo.WorkerEngineExact;
import eu.stratosphere.types.IntValue; import eu.stratosphere.types.IntValue;
...@@ -23,13 +23,12 @@ import eu.stratosphere.types.LongValue; ...@@ -23,13 +23,12 @@ import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue; import eu.stratosphere.types.StringValue;
public class TestTaskInvokable implements UserTaskInvokable { public class TestTaskInvokable extends UserTaskInvokable {
private WorkerEngineExact engine = new WorkerEngineExact(10, 1000, 0); private WorkerEngineExact engine = new WorkerEngineExact(10, 1000, 0);
@Override @Override
public void invoke(Record record, RecordWriter<Record> output) public void invoke(FlatStreamRecord record) throws Exception {
throws Exception {
IntValue value1 = new IntValue(0); IntValue value1 = new IntValue(0);
record.getFieldInto(0, value1); record.getFieldInto(0, value1);
LongValue value2 = new LongValue(0); LongValue value2 = new LongValue(0);
...@@ -38,15 +37,14 @@ public class TestTaskInvokable implements UserTaskInvokable { ...@@ -38,15 +37,14 @@ public class TestTaskInvokable implements UserTaskInvokable {
// INFO // INFO
if (record.getNumFields() == 2) { if (record.getNumFields() == 2) {
engine.put(value1.getValue(), value2.getValue()); engine.put(value1.getValue(), value2.getValue());
output.emit(new Record(new StringValue(value1 + " " + value2))); emit(new FlatStreamRecord(new Record(new StringValue(value1 + " " + value2))));
} }
// QUERY // QUERY
else if (record.getNumFields() == 3) { else if (record.getNumFields() == 3) {
LongValue value3 = new LongValue(0); LongValue value3 = new LongValue(0);
record.getFieldInto(2, value3); record.getFieldInto(2, value3);
emit(new FlatStreamRecord(new Record(new StringValue(String.valueOf(engine.get(
output.emit(new Record(new StringValue(String.valueOf(engine.get( value2.getValue(), value3.getValue(), value1.getValue()))))));
value2.getValue(), value3.getValue(), value1.getValue())))));
} }
} }
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册