提交 0e2b44fc 编写于 作者: G gaborhermann 提交者: Stephan Ewen

[streaming] Ordered imports, migrated StringValues to IntValues and LongValues

上级 7d82dd1f
package eu.stratosphere.streaming; package eu.stratosphere.streaming;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
......
...@@ -14,21 +14,20 @@ import eu.stratosphere.nephele.template.AbstractInputTask; ...@@ -14,21 +14,20 @@ import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.nephele.template.AbstractOutputTask; import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.nephele.template.AbstractTask; import eu.stratosphere.nephele.template.AbstractTask;
public class JobGraphBuilder { public class JobGraphBuilder {
private final JobGraph jobGraph; private final JobGraph jobGraph;
private Map<String, AbstractJobVertex> components; private Map<String, AbstractJobVertex> components;
public JobGraphBuilder(String jobGraphName) { public JobGraphBuilder(String jobGraphName) {
jobGraph = new JobGraph(jobGraphName); jobGraph = new JobGraph(jobGraphName);
components = new HashMap<String, AbstractJobVertex>(); components = new HashMap<String, AbstractJobVertex>();
} }
//TODO: Add source parallelism // TODO: Add source parallelism
public void setSource(String sourceName, final Class<? extends AbstractInputTask<?>> sourceClass) { public void setSource(String sourceName,
final Class<? extends AbstractInputTask<?>> sourceClass) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph); final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(sourceClass); source.setInputClass(sourceClass);
...@@ -36,7 +35,8 @@ public class JobGraphBuilder { ...@@ -36,7 +35,8 @@ public class JobGraphBuilder {
} }
public void setTask(String taskName, final Class<? extends AbstractTask> taskClass, int parallelism) { public void setTask(String taskName,
final Class<? extends AbstractTask> taskClass, int parallelism) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph); final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(taskClass); task.setTaskClass(taskClass);
...@@ -44,34 +44,32 @@ public class JobGraphBuilder { ...@@ -44,34 +44,32 @@ public class JobGraphBuilder {
components.put(taskName, task); components.put(taskName, task);
} }
public void setSink(String sinkName, final Class<? extends AbstractOutputTask> sinkClass) { public void setSink(String sinkName,
final Class<? extends AbstractOutputTask> sinkClass) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph); final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(sinkClass); sink.setOutputClass(sinkClass);
components.put(sinkName, sink); components.put(sinkName, sink);
} }
public void connect(String upStreamComponentName,
String downStreamComponentName, ChannelType channelType) {
public void connect(String upStreamComponentName, String downStreamComponentName, ChannelType channelType) { AbstractJobVertex upStreamComponent = null;
AbstractJobVertex downStreamComponent = null;
AbstractJobVertex upStreamComponent=null;
AbstractJobVertex downStreamComponent=null;
upStreamComponent = components.get(upStreamComponentName); upStreamComponent = components.get(upStreamComponentName);
downStreamComponent = components.get(downStreamComponentName); downStreamComponent = components.get(downStreamComponentName);
try { try {
upStreamComponent.connectTo(downStreamComponent, channelType); upStreamComponent.connectTo(downStreamComponent, channelType);
} } catch (JobGraphDefinitionException e) {
catch (JobGraphDefinitionException e) {
e.printStackTrace(); e.printStackTrace();
} }
} }
public JobGraph getJobGraph() { public JobGraph getJobGraph() {
return jobGraph; return jobGraph;
} }
} }
package eu.stratosphere.streaming; package eu.stratosphere.streaming;
import eu.stratosphere.streaming.cellinfo.WorkerEngineExact;
import java.util.Random;
import eu.stratosphere.configuration.Configuration; import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord; import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordReader; import eu.stratosphere.nephele.io.RecordReader;
...@@ -20,7 +15,10 @@ import eu.stratosphere.nephele.template.AbstractInputTask; ...@@ -20,7 +15,10 @@ import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.nephele.template.AbstractOutputTask; import eu.stratosphere.nephele.template.AbstractOutputTask;
import eu.stratosphere.nephele.template.AbstractTask; import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.pact.runtime.task.util.TaskConfig; import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.streaming.cellinfo.WorkerEngineExact;
import eu.stratosphere.test.util.TestBase2; import eu.stratosphere.test.util.TestBase2;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue; import eu.stratosphere.types.StringValue;
...@@ -105,13 +103,22 @@ public class MyStream extends TestBase2 { ...@@ -105,13 +103,22 @@ public class MyStream extends TestBase2 {
@Override @Override
public void invoke() throws Exception { public void invoke() throws Exception {
Random rnd = new Random();
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
// output.emit(new Record record1 = new Record(3);
// StringRecord(rnd.nextInt(10)+" "+rnd.nextInt(1000)+" 500")); record1.setField(0, new IntValue(5));
output.emit(new Record(new StringValue("5 510 100"))); record1.setField(1, new LongValue(510));
output.emit(new Record(new StringValue("4 510 100"))); record1.setField(2, new LongValue(100));
Record record2 = new Record(3);
record2.setField(0, new IntValue(4));
record2.setField(1, new LongValue(510));
record1.setField(2, new LongValue(100));
output.emit(record1);
output.emit(record2);
// output.emit(new Record(new StringValue("5 510 100")));
// output.emit(new Record(new StringValue("4 510 100")));
} }
} }
...@@ -186,6 +193,16 @@ public class MyStream extends TestBase2 { ...@@ -186,6 +193,16 @@ public class MyStream extends TestBase2 {
} }
private class QuerySourceInvokable implements UserSourceInvokable {
@Override
public void invoke(RecordWriter<Record> output) throws Exception {
for (int i = 0; i < 5; i++) {
output.emit(new Record(new StringValue("5 510 100")));
output.emit(new Record(new StringValue("4 510 100")));
}
}
}
@Override @Override
public JobGraph getJobGraph() { public JobGraph getJobGraph() {
...@@ -196,6 +213,7 @@ public class MyStream extends TestBase2 { ...@@ -196,6 +213,7 @@ public class MyStream extends TestBase2 {
TaskConfig tConfig = new TaskConfig(infoSource.getConfiguration()); TaskConfig tConfig = new TaskConfig(infoSource.getConfiguration());
Configuration config = tConfig.getConfiguration(); Configuration config = tConfig.getConfiguration();
config.setClass("partitioner", StreamPartitioner.class); config.setClass("partitioner", StreamPartitioner.class);
config.setClass("querySourceInvokable", QuerySourceInvokable.class);
infoSource.setInputClass(InfoSource.class); infoSource.setInputClass(InfoSource.class);
final JobInputVertex querySource = new JobInputVertex("MyQuerySource", myJG); final JobInputVertex querySource = new JobInputVertex("MyQuerySource", myJG);
......
package eu.stratosphere.streaming; package eu.stratosphere.streaming;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class StreamPartitioner implements ChannelSelector<Record> { public class StreamPartitioner implements ChannelSelector<Record> {
/*@Override
public int[] selectChannels(StringRecord record, int numberOfOutputChannels) {
int cellId = Integer.parseInt(record.toString().split(" ")[0]);
return new int[]{cellId % numberOfOutputChannels};
}*/
@Override @Override
public int[] selectChannels(Record record, int numberOfOutputChannels) { public int[] selectChannels(Record record, int numberOfOutputChannels) {
StringValue value = new StringValue(""); IntValue value = new IntValue();
record.getFieldInto(0, value); record.getFieldInto(0, value);
int cellId = Integer.parseInt(value.getValue().split(" ")[0]); int cellId = value.getValue();
return new int[]{cellId % numberOfOutputChannels}; return new int[]{cellId % numberOfOutputChannels};
} }
} }
package eu.stratosphere.streaming; package eu.stratosphere.streaming;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask; import eu.stratosphere.nephele.template.AbstractInputTask;
...@@ -34,29 +33,26 @@ public class StreamSource extends AbstractInputTask<RandIS> { ...@@ -34,29 +33,26 @@ public class StreamSource extends AbstractInputTask<RandIS> {
} }
private void setClassInputs() { private void setClassInputs() {
Partitioner = getTaskConfiguration().getClass("partitioner", DefaultPartitioner.class,ChannelSelector.class); Partitioner = getTaskConfiguration().getClass("partitioner",
DefaultPartitioner.class, ChannelSelector.class);
try { try {
partitioner = Partitioner.newInstance(); partitioner = Partitioner.newInstance();
} catch (Exception e) { } catch (Exception e) {
} }
UserFunction = getTaskConfiguration().getClass("userfunction", TestSourceInvokable.class,UserSourceInvokable.class); UserFunction = getTaskConfiguration().getClass("userfunction",
try TestSourceInvokable.class, UserSourceInvokable.class);
{ try {
userFunction = UserFunction.newInstance(); userFunction = UserFunction.newInstance();
} catch (Exception e) } catch (Exception e) {
{
} }
} }
@Override @Override
public void registerInputOutput() { public void registerInputOutput() {
setClassInputs(); setClassInputs();
output = new RecordWriter<Record>(this, output = new RecordWriter<Record>(this, Record.class, this.partitioner);
Record.class, this.partitioner);
} }
@Override @Override
......
...@@ -3,14 +3,10 @@ package eu.stratosphere.streaming; ...@@ -3,14 +3,10 @@ package eu.stratosphere.streaming;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.io.ChannelSelector; import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordReader; import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.nephele.template.AbstractTask; import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.cellinfo.WorkerEngineExact;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
public class StreamTask extends AbstractTask { public class StreamTask extends AbstractTask {
...@@ -21,12 +17,9 @@ public class StreamTask extends AbstractTask { ...@@ -21,12 +17,9 @@ public class StreamTask extends AbstractTask {
private Class<? extends UserTaskInvokable> UserFunction; private Class<? extends UserTaskInvokable> UserFunction;
private UserTaskInvokable userFunction; private UserTaskInvokable userFunction;
private RecordReader<Record> inputInfo = null; private RecordReader<Record> inputInfo = null;
private RecordReader<Record> inputQuery = null; private RecordReader<Record> inputQuery = null;
public StreamTask() { public StreamTask() {
Partitioner = null; Partitioner = null;
UserFunction = null; UserFunction = null;
...@@ -34,20 +27,19 @@ public class StreamTask extends AbstractTask { ...@@ -34,20 +27,19 @@ public class StreamTask extends AbstractTask {
userFunction = null; userFunction = null;
} }
private void setClassInputs() { private void setClassInputs() {
Partitioner = getTaskConfiguration().getClass("partitioner", DefaultPartitioner.class,ChannelSelector.class); Partitioner = getTaskConfiguration().getClass("partitioner",
DefaultPartitioner.class, ChannelSelector.class);
try { try {
partitioner = Partitioner.newInstance(); partitioner = Partitioner.newInstance();
} catch (Exception e) { } catch (Exception e) {
} }
UserFunction = getTaskConfiguration().getClass("userfunction", TestTaskInvokable.class,UserTaskInvokable.class); UserFunction = getTaskConfiguration().getClass("userfunction",
try TestTaskInvokable.class, UserTaskInvokable.class);
{ try {
userFunction = UserFunction.newInstance(); userFunction = UserFunction.newInstance();
} catch (Exception e) } catch (Exception e) {
{
} }
...@@ -64,16 +56,14 @@ public class StreamTask extends AbstractTask { ...@@ -64,16 +56,14 @@ public class StreamTask extends AbstractTask {
@Override @Override
public void invoke() throws Exception { public void invoke() throws Exception {
List< RecordReader<Record>> inputs = new ArrayList< RecordReader<Record>>(); List<RecordReader<Record>> inputs = new ArrayList<RecordReader<Record>>();
inputs.add(inputInfo); inputs.add(inputInfo);
inputs.add(inputQuery); inputs.add(inputQuery);
boolean hasInput = true; boolean hasInput = true;
while (hasInput) while (hasInput) {
{
hasInput = false; hasInput = false;
for (RecordReader<Record> input : inputs) for (RecordReader<Record> input : inputs) {
{
if (input.hasNext()) { if (input.hasNext()) {
hasInput = true; hasInput = true;
userFunction.invoke(input.next(), output); userFunction.invoke(input.next(), output);
......
package eu.stratosphere.streaming; package eu.stratosphere.streaming;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
public class TestSourceInvokable implements UserSourceInvokable { public class TestSourceInvokable implements UserSourceInvokable {
@Override @Override
public void invoke(RecordWriter<Record> output) throws Exception { public void invoke(RecordWriter<Record> output) throws Exception {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
// output.emit(new StringRecord(rnd.nextInt(10)+" "+rnd.nextInt(1000))); Record record1 = new Record(2);
output.emit(new Record(new StringValue("5 500")));//new StringRecord("5 500")); record1.setField(0, new IntValue(5));
output.emit(new Record(new StringValue("4 500"))); record1.setField(1, new LongValue(500));
Record record2 = new Record(2);
record2.setField(0, new IntValue(4));
record2.setField(1, new LongValue(500));
output.emit(record1);
output.emit(record2);
} }
} }
......
package eu.stratosphere.streaming; package eu.stratosphere.streaming;
import java.util.List;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.cellinfo.WorkerEngineExact; import eu.stratosphere.streaming.cellinfo.WorkerEngineExact;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue; import eu.stratosphere.types.StringValue;
...@@ -15,55 +12,25 @@ public class TestTaskInvokable implements UserTaskInvokable { ...@@ -15,55 +12,25 @@ public class TestTaskInvokable implements UserTaskInvokable {
private WorkerEngineExact engine = new WorkerEngineExact(10, 1000, 0); private WorkerEngineExact engine = new WorkerEngineExact(10, 1000, 0);
@Override @Override
public void invoke(Record record, public void invoke(Record record, RecordWriter<Record> output)
RecordWriter<Record> output) throws Exception { throws Exception {
StringValue value = new StringValue(); IntValue value1 = new IntValue(0);
record.getFieldInto(0, value); record.getFieldInto(0, value1);
String[] values = value.getValue().split(" "); LongValue value2 = new LongValue(0);
record.getFieldInto(1, value2);
//INFO
if (values.length == 2) // INFO
{ if (record.getNumFields() == 2) {
engine.put(Integer.parseInt(values[0]), Long.parseLong(values[1])); engine.put(value1.getValue(), value2.getValue());
output.emit(new Record(new StringValue(values[0] + " " + values[1]))); output.emit(new Record(new StringValue(value1 + " " + value2)));
} }
//QUERY // QUERY
else if (values.length == 3) else if (record.getNumFields() == 3) {
{ LongValue value3 = new LongValue(0);
record.getFieldInto(2, value3);
output.emit(new Record(new StringValue(String.valueOf(engine.get( output.emit(new Record(new StringValue(String.valueOf(engine.get(
Long.parseLong(values[1]), Long.parseLong(values[2]), value2.getValue(), value3.getValue(), value1.getValue())))));
Integer.parseInt(values[0]))))));
} }
// RecordReader<IOReadableWritable> input1= inputs.get(0);
// RecordReader<IOReadableWritable> input2= inputs.get(0);
//
//
// while (input1.hasNext() && input2.hasNext()) {
// String[] info = input1.next().toString().split(" ");
// String[] query = input2.next().toString().split(" ");
//
// engine.put(Integer.parseInt(info[0]), Long.parseLong(info[1]));
//
// output.emit(new StringRecord(info[0] + " " + info[1]));
// output.emit(new StringRecord(String.valueOf(engine.get(
// Long.parseLong(query[1]), Long.parseLong(query[2]),
// Integer.parseInt(query[0])))));
// }
// while (inputs.get(0).hasNext()) {
//
// IOReadableWritable info = inputs.get(0).next();
//
// output.emit(info);
// }
// while (inputs.get(1).hasNext()) {
//
// IOReadableWritable query = inputs.get(1).next();
//
// output.emit(query);
// }
} }
} }
package eu.stratosphere.streaming; package eu.stratosphere.streaming;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
......
package eu.stratosphere.streaming; package eu.stratosphere.streaming;
import java.util.List;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter; import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.types.Record; import eu.stratosphere.types.Record;
...@@ -11,5 +7,4 @@ public interface UserTaskInvokable { ...@@ -11,5 +7,4 @@ public interface UserTaskInvokable {
public void invoke(Record record, public void invoke(Record record,
RecordWriter<Record> output) throws Exception; RecordWriter<Record> output) throws Exception;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册