提交 7d82dd1f 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] StringRecord to Record migrate

上级 27f35d47
......@@ -2,11 +2,12 @@ package eu.stratosphere.streaming;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.types.Record;
public class DefaultPartitioner implements ChannelSelector<IOReadableWritable> {
public class DefaultPartitioner implements ChannelSelector<Record> {
@Override
public int[] selectChannels(IOReadableWritable record, int numberOfOutputChannels) {
public int[] selectChannels(Record record, int numberOfOutputChannels) {
int[] returnChannels = new int[numberOfOutputChannels];
for(int i = 0; i < numberOfOutputChannels;i++) {
......
......@@ -21,13 +21,15 @@ import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.test.util.TestBase2;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class MyStream extends TestBase2 {
public static class InfoSource extends AbstractInputTask<RandIS> {
private RecordWriter<IOReadableWritable> output;
private Class<? extends ChannelSelector<IOReadableWritable>> Partitioner;
ChannelSelector<IOReadableWritable> partitioner;
private RecordWriter<Record> output;
private Class<? extends ChannelSelector<Record>> Partitioner;
ChannelSelector<Record> partitioner;
private Class<? extends UserSourceInvokable> UserFunction;
UserSourceInvokable userFunction;
......@@ -71,9 +73,7 @@ public class MyStream extends TestBase2 {
@Override
public void registerInputOutput() {
setClassInputs();
output = new RecordWriter<IOReadableWritable>(this,
IOReadableWritable.class, this.partitioner);
output = new RecordWriter<Record>(this, Record.class, this.partitioner);
}
@Override
......@@ -84,7 +84,7 @@ public class MyStream extends TestBase2 {
public static class QuerySource extends AbstractInputTask<RandIS> {
private RecordWriter<StringRecord> output;
private RecordWriter<Record> output;
@Override
public RandIS[] computeInputSplits(int requestedMinNumber) throws Exception {
......@@ -100,8 +100,7 @@ public class MyStream extends TestBase2 {
@Override
public void registerInputOutput() {
output = new RecordWriter<StringRecord>(this, StringRecord.class,
new StreamPartitioner());
output = new RecordWriter<Record>(this, Record.class, new StreamPartitioner());
}
@Override
......@@ -111,8 +110,8 @@ public class MyStream extends TestBase2 {
for (int i = 0; i < 5; i++) {
// output.emit(new
// StringRecord(rnd.nextInt(10)+" "+rnd.nextInt(1000)+" 500"));
output.emit(new StringRecord("5 510 100"));
output.emit(new StringRecord("4 510 100"));
output.emit(new Record(new StringValue("5 510 100")));
output.emit(new Record(new StringValue("4 510 100")));
}
}
......@@ -121,18 +120,21 @@ public class MyStream extends TestBase2 {
public static class MySink extends AbstractOutputTask {
private RecordReader<StringRecord> input = null;
private RecordReader<Record> input = null;
@Override
public void registerInputOutput() {
this.input = new RecordReader<StringRecord>(this, StringRecord.class);
this.input = new RecordReader<Record>(this, Record.class);
}
@Override
public void invoke() throws Exception {
while (input.hasNext()) {
System.out.println(input.next().toString());
StringValue value = new StringValue("");
Record record = input.next();
record.getFieldInto(0, value);
System.out.println(value.getValue());
}
}
......@@ -229,8 +231,8 @@ public class MyStream extends TestBase2 {
graphBuilder.connect("querySource", "cellTask", ChannelType.INMEMORY);
graphBuilder.connect("cellTask", "sink", ChannelType.INMEMORY);
//return graphBuilder.getJobGraph();
return myJG;
return graphBuilder.getJobGraph();
//return myJG;
}
}
......@@ -2,15 +2,22 @@ package eu.stratosphere.streaming;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class StreamPartitioner implements ChannelSelector<StringRecord> {
public class StreamPartitioner implements ChannelSelector<Record> {
@Override
/*@Override
public int[] selectChannels(StringRecord record, int numberOfOutputChannels) {
// TODO Auto-generated method stub
int cellId = Integer.parseInt(record.toString().split(" ")[0]);
int cellId = Integer.parseInt(record.toString().split(" ")[0]);
return new int[]{cellId % numberOfOutputChannels};
}
}*/
@Override
public int[] selectChannels(Record record, int numberOfOutputChannels) {
StringValue value = new StringValue("");
record.getFieldInto(0, value);
int cellId = Integer.parseInt(value.getValue().split(" ")[0]);
return new int[]{cellId % numberOfOutputChannels};
}
}
......@@ -4,12 +4,13 @@ import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.types.Record;
public class StreamSource extends AbstractInputTask<RandIS> {
private RecordWriter<IOReadableWritable> output;
private Class<? extends ChannelSelector<IOReadableWritable>> Partitioner;
ChannelSelector<IOReadableWritable> partitioner;
private RecordWriter<Record> output;
private Class<? extends ChannelSelector<Record>> Partitioner;
ChannelSelector<Record> partitioner;
private Class<? extends UserSourceInvokable> UserFunction;
private UserSourceInvokable userFunction;
......@@ -53,16 +54,14 @@ public class StreamSource extends AbstractInputTask<RandIS> {
@Override
public void registerInputOutput() {
setClassInputs();
output = new RecordWriter<IOReadableWritable>(this,
IOReadableWritable.class, this.partitioner);
output = new RecordWriter<Record>(this,
Record.class, this.partitioner);
}
@Override
public void invoke() throws Exception {
userFunction.invoke(output);
}
}
......@@ -11,18 +11,19 @@ import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.cellinfo.WorkerEngineExact;
import eu.stratosphere.types.Record;
public class StreamTask extends AbstractTask {
private RecordWriter<IOReadableWritable> output;
private Class<? extends ChannelSelector<IOReadableWritable>> Partitioner;
ChannelSelector<IOReadableWritable> partitioner;
private RecordWriter<Record> output;
private Class<? extends ChannelSelector<Record>> Partitioner;
ChannelSelector<Record> partitioner;
private Class<? extends UserTaskInvokable> UserFunction;
private UserTaskInvokable userFunction;
private RecordReader<IOReadableWritable> inputInfo = null;
private RecordReader<IOReadableWritable> inputQuery = null;
private RecordReader<Record> inputInfo = null;
private RecordReader<Record> inputQuery = null;
......@@ -55,19 +56,30 @@ public class StreamTask extends AbstractTask {
@Override
public void registerInputOutput() {
setClassInputs();
this.inputInfo = new RecordReader<IOReadableWritable>(this, IOReadableWritable.class);
this.inputQuery = new RecordReader<IOReadableWritable>(this, IOReadableWritable.class);
output = new RecordWriter<IOReadableWritable>(this, IOReadableWritable.class, this.partitioner);
this.inputInfo = new RecordReader<Record>(this, Record.class);
this.inputQuery = new RecordReader<Record>(this, Record.class);
output = new RecordWriter<Record>(this, Record.class, this.partitioner);
}
@Override
public void invoke() throws Exception {
List< RecordReader<IOReadableWritable>> inputs = new ArrayList< RecordReader<IOReadableWritable>>();
List< RecordReader<Record>> inputs = new ArrayList< RecordReader<Record>>();
inputs.add(inputInfo);
inputs.add(inputQuery);
userFunction.invoke(inputs,output);
boolean hasInput = true;
while (hasInput)
{
hasInput = false;
for (RecordReader<Record> input : inputs)
{
if (input.hasNext()) {
hasInput = true;
userFunction.invoke(input.next(), output);
}
}
}
}
......
......@@ -4,16 +4,18 @@ package eu.stratosphere.streaming;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
public class TestSourceInvokable implements UserSourceInvokable {
@Override
public void invoke(RecordWriter<IOReadableWritable> output) throws Exception {
public void invoke(RecordWriter<Record> output) throws Exception {
for (int i = 0; i < 10; i++) {
// output.emit(new StringRecord(rnd.nextInt(10)+" "+rnd.nextInt(1000)));
output.emit(new StringRecord("5 500"));
output.emit(new StringRecord("4 500"));
output.emit(new Record(new StringValue("5 500")));//new StringRecord("5 500"));
output.emit(new Record(new StringValue("4 500")));
}
}
......
......@@ -7,40 +7,61 @@ import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.cellinfo.WorkerEngineExact;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class TestTaskInvokable implements UserTaskInvokable {
private WorkerEngineExact engine = new WorkerEngineExact(10, 1000, 0);
@Override
public void invoke(List<RecordReader<IOReadableWritable>> inputs,
RecordWriter<IOReadableWritable> output) throws Exception {
RecordReader<IOReadableWritable> input1= inputs.get(0);
RecordReader<IOReadableWritable> input2= inputs.get(0);
while (input1.hasNext() && input2.hasNext()) {
String[] info = input1.next().toString().split(" ");
String[] query = input2.next().toString().split(" ");
engine.put(Integer.parseInt(info[0]), Long.parseLong(info[1]));
output.emit(new StringRecord(info[0] + " " + info[1]));
output.emit(new StringRecord(String.valueOf(engine.get(
Long.parseLong(query[1]), Long.parseLong(query[2]),
Integer.parseInt(query[0])))));
}
while (inputs.get(0).hasNext()) {
IOReadableWritable info = inputs.get(0).next();
output.emit(info);
}
while (inputs.get(1).hasNext()) {
IOReadableWritable query = inputs.get(1).next();
output.emit(query);
}
public void invoke(Record record,
RecordWriter<Record> output) throws Exception {
StringValue value = new StringValue();
record.getFieldInto(0, value);
String[] values = value.getValue().split(" ");
//INFO
if (values.length == 2)
{
engine.put(Integer.parseInt(values[0]), Long.parseLong(values[1]));
output.emit(new Record(new StringValue(values[0] + " " + values[1])));
}
//QUERY
else if (values.length == 3)
{
output.emit(new Record(new StringValue(String.valueOf(engine.get(
Long.parseLong(values[1]), Long.parseLong(values[2]),
Integer.parseInt(values[0]))))));
}
// RecordReader<IOReadableWritable> input1= inputs.get(0);
// RecordReader<IOReadableWritable> input2= inputs.get(0);
//
//
// while (input1.hasNext() && input2.hasNext()) {
// String[] info = input1.next().toString().split(" ");
// String[] query = input2.next().toString().split(" ");
//
// engine.put(Integer.parseInt(info[0]), Long.parseLong(info[1]));
//
// output.emit(new StringRecord(info[0] + " " + info[1]));
// output.emit(new StringRecord(String.valueOf(engine.get(
// Long.parseLong(query[1]), Long.parseLong(query[2]),
// Integer.parseInt(query[0])))));
// }
// while (inputs.get(0).hasNext()) {
//
// IOReadableWritable info = inputs.get(0).next();
//
// output.emit(info);
// }
// while (inputs.get(1).hasNext()) {
//
// IOReadableWritable query = inputs.get(1).next();
//
// output.emit(query);
// }
}
......
......@@ -2,7 +2,8 @@ package eu.stratosphere.streaming;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.types.Record;
public interface UserSourceInvokable {
public void invoke(RecordWriter<IOReadableWritable> output) throws Exception ;
public void invoke(RecordWriter<Record> output) throws Exception ;
}
......@@ -5,10 +5,11 @@ import java.util.List;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.types.Record;
public interface UserTaskInvokable {
public void invoke(List<RecordReader<IOReadableWritable>> inputs,
RecordWriter<IOReadableWritable> output) throws Exception;
public void invoke(Record record,
RecordWriter<Record> output) throws Exception;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册