提交 27f35d47 编写于 作者: G Gyula Fora 提交者: Stephan Ewen

[streaming] Source and Task interface updated

上级 a0d9ef51
......@@ -10,7 +10,7 @@ public class DefaultPartitioner implements ChannelSelector<IOReadableWritable>
int[] returnChannels = new int[numberOfOutputChannels];
for(int i = 0; i < numberOfOutputChannels;i++) {
returnChannels[i]=i+1;
returnChannels[i]=i;
}
return returnChannels;
}
......
package eu.stratosphere.streaming;
import eu.stratosphere.streaming.cellinfo.WorkerEngineExact;
import eu.stratosphere.configuration.Configuration;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.Random;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.InputSplit;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordReader;
......@@ -29,211 +23,214 @@ import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.test.util.TestBase2;
public class MyStream extends TestBase2 {
public static class InfoSource extends AbstractInputTask<RandIS> {
private RecordWriter<IOReadableWritable> output;
private Class<? extends ChannelSelector<IOReadableWritable>> Partitioner;
ChannelSelector<IOReadableWritable> partitioner;
private Class<? extends UserSourceInvokable> UserFunction;
UserSourceInvokable userFunction;
public InfoSource() {
Partitioner = null;
UserFunction = null;
partitioner = null;
userFunction = null;
}
@Override
public RandIS[] computeInputSplits(int requestedMinNumber) throws Exception {
// TODO Auto-generated method stub
return null;
}
@Override
public Class<RandIS> getInputSplitType() {
// TODO Auto-generated method stub
return null;
}
private void setClassInputs() {
Partitioner = getTaskConfiguration().getClass("partitioner", DefaultPartitioner.class,ChannelSelector.class);
try {
partitioner = Partitioner.newInstance();
} catch (Exception e) {
}
UserFunction = getTaskConfiguration().getClass("userfunction", TestSourceInvokable.class,UserSourceInvokable.class);
try
{
userFunction = UserFunction.newInstance();
} catch (Exception e)
{
}
}
@Override
public void registerInputOutput() {
setClassInputs();
output = new RecordWriter<IOReadableWritable>(this,
IOReadableWritable.class, this.partitioner);
}
@Override
public void invoke() throws Exception {
userFunction.invoke(output);
}
}
public static class QuerySource extends AbstractInputTask<RandIS> {
private RecordWriter<StringRecord> output;
@Override
public RandIS[] computeInputSplits(int requestedMinNumber) throws Exception {
// TODO Auto-generated method stub
return null;
}
@Override
public Class<RandIS> getInputSplitType() {
// TODO Auto-generated method stub
return null;
}
@Override
public void registerInputOutput() {
output = new RecordWriter<StringRecord>(this, StringRecord.class,
new StreamPartitioner());
}
@Override
public void invoke() throws Exception {
Random rnd = new Random();
for (int i = 0; i < 5; i++) {
// output.emit(new
// StringRecord(rnd.nextInt(10)+" "+rnd.nextInt(1000)+" 500"));
output.emit(new StringRecord("5 510 100"));
output.emit(new StringRecord("4 510 100"));
}
}
}
public static class MySink extends AbstractOutputTask {
private RecordReader<StringRecord> input = null;
@Override
public void registerInputOutput() {
this.input = new RecordReader<StringRecord>(this, StringRecord.class);
}
@Override
public void invoke() throws Exception {
while (input.hasNext()) {
System.out.println(input.next().toString());
}
}
}
public static class MyStreamMap extends AbstractTask {
private RecordReader<StringRecord> inputInfo = null;
private RecordReader<StringRecord> inputQuery = null;
private RecordWriter<StringRecord> output = null;
private WorkerEngineExact engine = new WorkerEngineExact(10, 1000, 0);
@Override
public void invoke() throws Exception {
while (this.inputInfo.hasNext() && this.inputQuery.hasNext()) {
String[] info = inputInfo.next().toString().split(" ");
String[] query = inputQuery.next().toString().split(" ");
engine.put(Integer.parseInt(info[0]), Long.parseLong(info[1]));
this.output.emit(new StringRecord(info[0] + " " + info[1]));
this.output.emit(new StringRecord(String.valueOf(engine.get(
Long.parseLong(query[1]), Long.parseLong(query[2]),
Integer.parseInt(query[0])))));
}
while (this.inputInfo.hasNext()) {
StringRecord info = inputInfo.next();
this.output.emit(info);
}
while (this.inputQuery.hasNext()) {
StringRecord query = inputQuery.next();
this.output.emit(query);
}
}
@Override
public void registerInputOutput() {
this.inputInfo = new RecordReader<StringRecord>(this, StringRecord.class);
this.inputQuery = new RecordReader<StringRecord>(this, StringRecord.class);
this.output = new RecordWriter<StringRecord>(this, StringRecord.class);
}
}
@Override
public JobGraph getJobGraph() {
final JobGraph myJG = new JobGraph("MyStream");
// SOURCE
final JobInputVertex infoSource = new JobInputVertex("MyInfoSource", myJG);
TaskConfig tConfig = new TaskConfig(infoSource.getConfiguration());
Configuration config = tConfig.getConfiguration();
config.setClass("partitioner", StreamPartitioner.class);
infoSource.setInputClass(InfoSource.class);
final JobInputVertex querySource = new JobInputVertex("MyQuerySource", myJG);
// final TaskConfig config = new TaskConfig(querySource.getConfiguration());
querySource.setInputClass(QuerySource.class);
// TASK
final JobTaskVertex task1 = new JobTaskVertex("MyTask1", myJG);
task1.setTaskClass(MyStreamMap.class);
task1.setNumberOfSubtasks(2);
// SINK
final JobOutputVertex sink = new JobOutputVertex("MySink", myJG);
// final TaskConfig config = new TaskConfig(sink.getConfiguration());
sink.setOutputClass(MySink.class);
try {
infoSource.connectTo(task1, ChannelType.INMEMORY);
querySource.connectTo(task1, ChannelType.INMEMORY);
task1.connectTo(sink, ChannelType.INMEMORY);
} catch (JobGraphDefinitionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("infoSource", StreamSource.class);
graphBuilder.setSource("querySource", QuerySource.class);
graphBuilder.setTask("cellTask", StreamTask.class, 2);
graphBuilder.setSink("sink", MySink.class);
public static class InfoSource extends AbstractInputTask<RandIS> {
private RecordWriter<IOReadableWritable> output;
@Override
public RandIS[] computeInputSplits(int requestedMinNumber)
throws Exception {
// TODO Auto-generated method stub
return null;
}
@Override
public Class<RandIS> getInputSplitType() {
// TODO Auto-generated method stub
return null;
}
@Override
public void registerInputOutput() {
Class<? extends ChannelSelector<IOReadableWritable>> myPartitioner = getTaskConfiguration().getClass("partitioner",DefaultPartitioner.class, ChannelSelector.class);
try {
ChannelSelector<IOReadableWritable> mP = myPartitioner.newInstance();
output = new RecordWriter<IOReadableWritable>(this, IOReadableWritable.class, mP);
}
catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void invoke() throws Exception {
for(int i=0; i<10; i++) {
//output.emit(new StringRecord(rnd.nextInt(10)+" "+rnd.nextInt(1000)));
output.emit(new StringRecord("5 500"));
output.emit(new StringRecord("4 500"));
}
}
}
public static class QuerySource extends AbstractInputTask<RandIS> {
private RecordWriter<StringRecord> output;
@Override
public RandIS[] computeInputSplits(int requestedMinNumber)
throws Exception {
// TODO Auto-generated method stub
return null;
}
@Override
public Class<RandIS> getInputSplitType() {
// TODO Auto-generated method stub
return null;
}
@Override
public void registerInputOutput() {
output = new RecordWriter<StringRecord>(this, StringRecord.class, new StreamPartitioner());
}
@Override
public void invoke() throws Exception {
Random rnd = new Random();
for(int i=0; i<5; i++) {
//output.emit(new StringRecord(rnd.nextInt(10)+" "+rnd.nextInt(1000)+" 500"));
output.emit(new StringRecord("5 510 100"));
output.emit(new StringRecord("4 510 100"));
}
}
}
public static class MySink extends AbstractOutputTask {
private RecordReader<StringRecord> input = null;
@Override
public void registerInputOutput() {
this.input = new RecordReader<StringRecord>(this, StringRecord.class);
}
@Override
public void invoke() throws Exception {
while(input.hasNext()) {
System.out.println(input.next().toString());
}
}
}
public static class MyStreamMap extends AbstractTask {
private RecordReader<StringRecord> inputInfo = null;
private RecordReader<StringRecord> inputQuery = null;
private RecordWriter<StringRecord> output = null;
private WorkerEngineExact engine = new WorkerEngineExact(10,1000,0);
@Override
public void invoke() throws Exception {
while (this.inputInfo.hasNext() && this.inputQuery.hasNext()) {
String[] info = inputInfo.next().toString().split(" ");
String[] query = inputQuery.next().toString().split(" ");
engine.put(Integer.parseInt(info[0]),Long.parseLong(info[1]));
this.output.emit(new StringRecord(info[0]+" "+info[1]));
this.output.emit(new StringRecord(String.valueOf(
engine.get(Long.parseLong(query[1]),Long.parseLong(query[2]),Integer.parseInt(query[0])))
));
}
while (this.inputInfo.hasNext()) {
StringRecord info = inputInfo.next();
this.output.emit(info);
}
while (this.inputQuery.hasNext()) {
StringRecord query = inputQuery.next();
this.output.emit(query);
}
}
@Override
public void registerInputOutput() {
this.inputInfo = new RecordReader<StringRecord>(this, StringRecord.class);
this.inputQuery = new RecordReader<StringRecord>(this, StringRecord.class);
this.output = new RecordWriter<StringRecord>(this, StringRecord.class);
}
}
@Override
public JobGraph getJobGraph() {
final JobGraph myJG = new JobGraph("MyStream");
// SOURCE
final JobInputVertex infoSource = new JobInputVertex("MyInfoSource", myJG);
//Configuration config = infoSource.getConfiguration();
//config.setClass("partition",StreamPartitioner.class);
infoSource.setInputClass(InfoSource.class);
final JobInputVertex querySource = new JobInputVertex("MyQuerySource", myJG);
//final TaskConfig config = new TaskConfig(querySource.getConfiguration());
querySource.setInputClass(QuerySource.class);
// TASK
final JobTaskVertex task1 = new JobTaskVertex("MyTask1", myJG);
task1.setTaskClass(MyStreamMap.class);
task1.setNumberOfSubtasks(2);
// SINK
final JobOutputVertex sink = new JobOutputVertex("MySink", myJG);
// final TaskConfig config = new TaskConfig(sink.getConfiguration());
sink.setOutputClass(MySink.class);
try {
infoSource.connectTo(task1, ChannelType.INMEMORY);
querySource.connectTo(task1, ChannelType.INMEMORY);
task1.connectTo(sink, ChannelType.INMEMORY);
} catch (JobGraphDefinitionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("infoSource", StreamSource.class);
graphBuilder.setSource("querySource", QuerySource.class);
graphBuilder.setTask("cellTask", MyStreamMap.class, 2);
graphBuilder.setSink("sink", MySink.class);
graphBuilder.connect("infoSource", "cellTask", ChannelType.INMEMORY);
graphBuilder.connect("querySource", "cellTask", ChannelType.INMEMORY);
graphBuilder.connect("cellTask", "sink", ChannelType.INMEMORY);
return graphBuilder.getJobGraph();
//return myJG;
}
//return graphBuilder.getJobGraph();
return myJG;
}
}
package eu.stratosphere.streaming;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
public class StreamSource extends AbstractInputTask<RandIS> {
private RecordWriter<IOReadableWritable> output;
@Override
public RandIS[] computeInputSplits(int requestedMinNumber)
throws Exception {
// TODO Auto-generated method stub
return null;
}
@Override
public Class<RandIS> getInputSplitType() {
// TODO Auto-generated method stub
return null;
}
@Override
public void registerInputOutput() {
Class<? extends ChannelSelector<IOReadableWritable>> MyPartitioner = getTaskConfiguration().getClass("partitioner",DefaultPartitioner.class, ChannelSelector.class);
try {
ChannelSelector<IOReadableWritable> myPartitioner = MyPartitioner.newInstance();
output = new RecordWriter<IOReadableWritable>(this, IOReadableWritable.class, myPartitioner);
}
catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void invoke() throws Exception {
for(int i=0; i<10; i++) {
//output.emit(new StringRecord(rnd.nextInt(10)+" "+rnd.nextInt(1000)));
output.emit(new StringRecord("5 500"));
output.emit(new StringRecord("4 500"));
}
}
}
private RecordWriter<IOReadableWritable> output;
private Class<? extends ChannelSelector<IOReadableWritable>> Partitioner;
ChannelSelector<IOReadableWritable> partitioner;
private Class<? extends UserSourceInvokable> UserFunction;
private UserSourceInvokable userFunction;
public StreamSource() {
Partitioner = null;
UserFunction = null;
partitioner = null;
userFunction = null;
}
@Override
public RandIS[] computeInputSplits(int requestedMinNumber) throws Exception {
// TODO Auto-generated method stub
return null;
}
@Override
public Class<RandIS> getInputSplitType() {
// TODO Auto-generated method stub
return null;
}
private void setClassInputs() {
Partitioner = getTaskConfiguration().getClass("partitioner", DefaultPartitioner.class,ChannelSelector.class);
try {
partitioner = Partitioner.newInstance();
} catch (Exception e) {
}
UserFunction = getTaskConfiguration().getClass("userfunction", TestSourceInvokable.class,UserSourceInvokable.class);
try
{
userFunction = UserFunction.newInstance();
} catch (Exception e)
{
}
}
@Override
public void registerInputOutput() {
setClassInputs();
output = new RecordWriter<IOReadableWritable>(this,
IOReadableWritable.class, this.partitioner);
}
@Override
public void invoke() throws Exception {
userFunction.invoke(output);
}
}
package eu.stratosphere.streaming;
import java.util.ArrayList;
import java.util.List;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.nephele.template.AbstractTask;
import eu.stratosphere.streaming.cellinfo.WorkerEngineExact;
public class StreamTask extends AbstractTask {
private RecordWriter<IOReadableWritable> output;
private Class<? extends ChannelSelector<IOReadableWritable>> Partitioner;
ChannelSelector<IOReadableWritable> partitioner;
private Class<? extends UserTaskInvokable> UserFunction;
private UserTaskInvokable userFunction;
private RecordReader<IOReadableWritable> inputInfo = null;
private RecordReader<IOReadableWritable> inputQuery = null;
public StreamTask() {
Partitioner = null;
UserFunction = null;
partitioner = null;
userFunction = null;
}
private void setClassInputs() {
Partitioner = getTaskConfiguration().getClass("partitioner", DefaultPartitioner.class,ChannelSelector.class);
try {
partitioner = Partitioner.newInstance();
} catch (Exception e) {
}
UserFunction = getTaskConfiguration().getClass("userfunction", TestTaskInvokable.class,UserTaskInvokable.class);
try
{
userFunction = UserFunction.newInstance();
} catch (Exception e)
{
}
}
@Override
public void registerInputOutput() {
setClassInputs();
this.inputInfo = new RecordReader<IOReadableWritable>(this, IOReadableWritable.class);
this.inputQuery = new RecordReader<IOReadableWritable>(this, IOReadableWritable.class);
output = new RecordWriter<IOReadableWritable>(this, IOReadableWritable.class, this.partitioner);
}
@Override
public void invoke() throws Exception {
List< RecordReader<IOReadableWritable>> inputs = new ArrayList< RecordReader<IOReadableWritable>>();
inputs.add(inputInfo);
inputs.add(inputQuery);
userFunction.invoke(inputs,output);
}
}
package eu.stratosphere.streaming;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.io.RecordWriter;
public class TestSourceInvokable implements UserSourceInvokable {
@Override
public void invoke(RecordWriter<IOReadableWritable> output) throws Exception {
for (int i = 0; i < 10; i++) {
// output.emit(new StringRecord(rnd.nextInt(10)+" "+rnd.nextInt(1000)));
output.emit(new StringRecord("5 500"));
output.emit(new StringRecord("4 500"));
}
}
}
package eu.stratosphere.streaming;
import java.util.List;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.cellinfo.WorkerEngineExact;
public class TestTaskInvokable implements UserTaskInvokable {
private WorkerEngineExact engine = new WorkerEngineExact(10, 1000, 0);
@Override
public void invoke(List<RecordReader<IOReadableWritable>> inputs,
RecordWriter<IOReadableWritable> output) throws Exception {
RecordReader<IOReadableWritable> input1= inputs.get(0);
RecordReader<IOReadableWritable> input2= inputs.get(0);
while (input1.hasNext() && input2.hasNext()) {
String[] info = input1.next().toString().split(" ");
String[] query = input2.next().toString().split(" ");
engine.put(Integer.parseInt(info[0]), Long.parseLong(info[1]));
output.emit(new StringRecord(info[0] + " " + info[1]));
output.emit(new StringRecord(String.valueOf(engine.get(
Long.parseLong(query[1]), Long.parseLong(query[2]),
Integer.parseInt(query[0])))));
}
while (inputs.get(0).hasNext()) {
IOReadableWritable info = inputs.get(0).next();
output.emit(info);
}
while (inputs.get(1).hasNext()) {
IOReadableWritable query = inputs.get(1).next();
output.emit(query);
}
}
}
package eu.stratosphere.streaming;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.io.RecordWriter;
public interface UserSourceInvokable {
public void invoke(RecordWriter<IOReadableWritable> output) throws Exception ;
}
package eu.stratosphere.streaming;
import java.util.List;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.nephele.io.RecordWriter;
public interface UserTaskInvokable {
public void invoke(List<RecordReader<IOReadableWritable>> inputs,
RecordWriter<IOReadableWritable> output) throws Exception;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册