提交 78558649 编写于 作者: G Gyula Fora 提交者: Stephan Ewen

[streaming] StreamSource and Task updated to support connection level partitioning

上级 c1f7a09d
......@@ -12,7 +12,6 @@ import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.nephele.template.AbstractInputTask;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
......@@ -22,79 +21,66 @@ import eu.stratosphere.types.Record;
public class JobGraphBuilder {
private final JobGraph jobGraph;
private Map<String, AbstractJobVertex> components;
public enum Partitioning {
BROADCAST
}
private Class<? extends ChannelSelector<Record>> getPartitioningClass(
Partitioning partitioning) {
switch (partitioning) {
case BROADCAST:
return DefaultPartitioner.class;
default:
return DefaultPartitioner.class;
}
}
public JobGraphBuilder(String jobGraphName) {
jobGraph = new JobGraph(jobGraphName);
components = new HashMap<String, AbstractJobVertex>();
}
// TODO: Add source parallelism
public void setSource(String sourceName,
final Class<? extends AbstractInputTask<?>> sourceClass) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(sourceClass);
components.put(sourceName, source);
}
public void setSource(String sourceName,
final Class<? extends UserSourceInvokable> InvokableClass,
Partitioning partitionType) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
Configuration config = new TaskConfig(source.getConfiguration())
.getConfiguration();
config.setClass("partitioner", getPartitioningClass(partitionType));
config.setClass("userfunction", InvokableClass);
components.put(sourceName, source);
}
public void setTask(String taskName,
final Class<? extends UserTaskInvokable> InvokableClass,
Partitioning partitionType, int parallelism) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
task.setNumberOfSubtasks(parallelism);
Configuration config = new TaskConfig(task.getConfiguration())
.getConfiguration();
config.setClass("partitioner", getPartitioningClass(partitionType));
config.setClass("userfunction", InvokableClass);
components.put(taskName, task);
}
public void setSink(String sinkName,
final Class<? extends UserSinkInvokable> InvokableClass) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
Configuration config = new TaskConfig(sink.getConfiguration())
.getConfiguration();
config.setClass("userfunction", InvokableClass);
components.put(sinkName, sink);
}
//TODO refactor connects
public void connect(String upStreamComponentName,
String downStreamComponentName,
private final JobGraph jobGraph;
private Map<String, AbstractJobVertex> components;
public enum Partitioning {
BROADCAST
}
private Class<? extends ChannelSelector<Record>> getPartitioningClass(
Partitioning partitioning) {
switch (partitioning) {
case BROADCAST:
return DefaultPartitioner.class;
default:
return DefaultPartitioner.class;
}
}
public JobGraphBuilder(String jobGraphName) {
jobGraph = new JobGraph(jobGraphName);
components = new HashMap<String, AbstractJobVertex>();
}
// TODO: Add source parallelism
public void setSource(String sourceName,
final Class<? extends UserSourceInvokable> InvokableClass) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
Configuration config = new TaskConfig(source.getConfiguration())
.getConfiguration();
config.setClass("userfunction", InvokableClass);
components.put(sourceName, source);
}
public void setTask(String taskName,
final Class<? extends UserTaskInvokable> InvokableClass, int parallelism) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
task.setNumberOfSubtasks(parallelism);
Configuration config = new TaskConfig(task.getConfiguration())
.getConfiguration();
config.setClass("userfunction", InvokableClass);
components.put(taskName, task);
}
public void setSink(String sinkName,
final Class<? extends UserSinkInvokable> InvokableClass) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
Configuration config = new TaskConfig(sink.getConfiguration())
.getConfiguration();
config.setClass("userfunction", InvokableClass);
components.put(sinkName, sink);
}
public void connect(String upStreamComponentName,
String downStreamComponentName, Partitioning partitionType,
ChannelType channelType) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
......@@ -102,49 +88,37 @@ public class JobGraphBuilder {
.get(downStreamComponentName);
try {
upStreamComponent.connectTo(downStreamComponent, channelType);
upStreamComponent.connectTo(downStreamComponent, channelType);
Configuration config = new TaskConfig(
upStreamComponent.getConfiguration()).getConfiguration();
config.setClass(
"partitioner_" + upStreamComponent.getNumberOfForwardConnections(),
getPartitioningClass(partitionType));
// System.out.println(upStreamComponentName + " " + "partitioner_"
// + upStreamComponent.getNumberOfForwardConnections());
} catch (JobGraphDefinitionException e) {
e.printStackTrace();
}
}
public void connect(String upStreamComponentName,
String downStreamComponentName,
Partitioning partitionType,
ChannelType channelType) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components
.get(downStreamComponentName);
try {
upStreamComponent.connectTo(downStreamComponent, channelType);
Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
.getConfiguration();
config.setClass("partitioner_"+upStreamComponent.getNumberOfForwardConnections(), getPartitioningClass(partitionType));
} catch (JobGraphDefinitionException e) {
e.printStackTrace();
}
}
private void setNumberOfJobInputs() {
for (AbstractJobVertex component : components.values()) {
component.getConfiguration().setInteger("numberOfInputs",
component.getNumberOfBackwardConnections());
}
}
private void setNumberOfJobOutputs() {
private void setNumberOfJobInputs() {
for (AbstractJobVertex component : components.values()) {
component.getConfiguration().setInteger("numberOfInputs",
component.getNumberOfBackwardConnections());
}
}
private void setNumberOfJobOutputs() {
for (AbstractJobVertex component : components.values()) {
component.getConfiguration().setInteger("numberOfOutputs",
component.getNumberOfForwardConnections());
}
}
public JobGraph getJobGraph() {
setNumberOfJobInputs();
setNumberOfJobOutputs();
return jobGraph;
}
public JobGraph getJobGraph() {
setNumberOfJobInputs();
setNumberOfJobOutputs();
return jobGraph;
}
}
package eu.stratosphere.streaming.api;
import java.util.LinkedList;
import java.util.List;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.nephele.template.AbstractInputTask;
......@@ -11,64 +14,79 @@ import eu.stratosphere.types.Record;
public class StreamSource extends AbstractInputTask<RandIS> {
//TODO: Refactor names
private RecordWriter<Record> output;
private Class<? extends ChannelSelector<Record>> Partitioner;
ChannelSelector<Record> partitioner;
private Class<? extends UserSourceInvokable> UserFunction;
private UserSourceInvokable userFunction;
public StreamSource() {
//TODO: Make configuration file visible and call setClassInputs() here
Partitioner = null;
UserFunction = null;
partitioner = null;
userFunction = null;
}
//TODO: Learn relevance of InputSplits
@Override
public RandIS[] computeInputSplits(int requestedMinNumber) throws Exception {
return null;
}
@Override
public Class<RandIS> getInputSplitType() {
return null;
}
//TODO:Refactor key names,
//TODO:Add output/input number to config and store class instances in list
//TODO:Change default classes when done with JobGraphBuilder
//TODO:Change partitioning from component level to connection level -> output_1_partitioner, output_2_partitioner etc.
private void setClassInputs() {
Partitioner = getTaskConfiguration().getClass("partitioner",
DefaultPartitioner.class, ChannelSelector.class);
try {
partitioner = Partitioner.newInstance();
} catch (Exception e) {
}
UserFunction = getTaskConfiguration().getClass("userfunction",
TestSourceInvokable.class, UserSourceInvokable.class);
}
//TODO: Store outputs in List
@Override
public void registerInputOutput() {
setClassInputs();
output = new RecordWriter<Record>(this, Record.class, this.partitioner);
}
//TODO: call userFunction.invoke for all output channels
@Override
public void invoke() throws Exception {
try {
userFunction = UserFunction.newInstance();
} catch (Exception e) {
}
userFunction.invoke(output);
}
// TODO: Refactor names
private List<RecordWriter<Record>> outputs;
private Class<? extends ChannelSelector<Record>> Partitioner;
private List<ChannelSelector<Record>> partitioners;
private Class<? extends UserSourceInvokable> UserFunction;
private UserSourceInvokable userFunction;
private int numberOfOutputs;
public StreamSource() {
// TODO: Make configuration file visible and call setClassInputs() here
outputs = new LinkedList<RecordWriter<Record>>();
partitioners = new LinkedList<ChannelSelector<Record>>();
Partitioner = null;
UserFunction = null;
userFunction = null;
numberOfOutputs = 0;
}
@Override
public RandIS[] computeInputSplits(int requestedMinNumber) throws Exception {
return null;
}
@Override
public Class<RandIS> getInputSplitType() {
return null;
}
// TODO:Refactor key names,
// TODO:Change default classes when done with JobGraphBuilder
private void setConfigInputs() {
UserFunction = getTaskConfiguration().getClass("userfunction",
TestSourceInvokable.class, UserSourceInvokable.class);
numberOfOutputs = getTaskConfiguration().getInteger("numberOfOutputs", 0);
for (int i = 1; i <= numberOfOutputs; i++) {
Partitioner = getTaskConfiguration().getClass("partitioner_" + i,
DefaultPartitioner.class, ChannelSelector.class);
try {
partitioners.add(Partitioner.newInstance());
// System.out.println("partitioner added");
} catch (Exception e) {
// System.out.println("partitioner error" + " " + "partitioner_" + i);
}
}
try {
userFunction = UserFunction.newInstance();
} catch (Exception e) {
}
}
@Override
public void registerInputOutput() {
setConfigInputs();
for (ChannelSelector<Record> partitioner : partitioners) {
outputs.add(new RecordWriter<Record>(this, Record.class, partitioner));
}
}
@Override
public void invoke() throws Exception {
for (RecordWriter<Record> output : outputs) {
userFunction.invoke(output);
}
}
}
......@@ -14,66 +14,82 @@ import eu.stratosphere.types.Record;
public class StreamTask extends AbstractTask {
// TODO: Refactor names
private RecordWriter<Record> output;
private Class<? extends ChannelSelector<Record>> Partitioner;
private List<RecordReader<Record>> inputs;
ChannelSelector<Record> partitioner;
private Class<? extends UserTaskInvokable> UserFunction;
private UserTaskInvokable userFunction;
public StreamTask() {
// TODO: Make configuration file visible and call setClassInputs() here
inputs = new LinkedList<RecordReader<Record>>();
Partitioner = null;
UserFunction = null;
partitioner = null;
userFunction = null;
}
// TODO:Refactor key names,
// TODO:Add output/input number to config and store class instances in list
// TODO:Change default classes when done with JobGraphBuilder
private void setClassInputs() {
Partitioner = getTaskConfiguration().getClass("partitioner",
DefaultPartitioner.class, ChannelSelector.class);
try {
partitioner = Partitioner.newInstance();
} catch (Exception e) {
}
UserFunction = getTaskConfiguration().getClass("userfunction",
TestTaskInvokable.class, UserTaskInvokable.class);
try {
userFunction = UserFunction.newInstance();
} catch (Exception e) {
}
}
@Override
public void registerInputOutput() {
setClassInputs();
int numberOfInputs = getTaskConfiguration().getInteger("numberOfInputs", 0);
for (int i = 0; i < numberOfInputs; i++) {
inputs.add(new RecordReader<Record>(this, Record.class));
}
output = new RecordWriter<Record>(this, Record.class, this.partitioner);
}
@Override
public void invoke() throws Exception {
boolean hasInput = true;
while (hasInput) {
hasInput = false;
for (RecordReader<Record> input : inputs) {
if (input.hasNext()) {
hasInput = true;
userFunction.invoke(input.next(), output);
}
}
}
}
// TODO: Refactor names
private Class<? extends ChannelSelector<Record>> Partitioner;
private List<RecordReader<Record>> inputs;
private List<RecordWriter<Record>> outputs;
private List<ChannelSelector<Record>> partitioners;
private Class<? extends UserTaskInvokable> UserFunction;
private UserTaskInvokable userFunction;
private int numberOfInputs;
private int numberOfOutputs;
public StreamTask() {
// TODO: Make configuration file visible and call setClassInputs() here
inputs = new LinkedList<RecordReader<Record>>();
outputs = new LinkedList<RecordWriter<Record>>();
Partitioner = null;
UserFunction = null;
partitioners = new LinkedList<ChannelSelector<Record>>();
userFunction = null;
numberOfInputs = 0;
numberOfOutputs = 0;
}
// TODO:Refactor key names,
// TODO:Change default classes when done with JobGraphBuilder
private void setConfigInputs() {
numberOfInputs = getTaskConfiguration().getInteger("numberOfInputs", 0);
for (int i = 0; i < numberOfInputs; i++) {
inputs.add(new RecordReader<Record>(this, Record.class));
}
numberOfOutputs = getTaskConfiguration().getInteger("numberOfOutputs", 0);
for (int i = 1; i <= numberOfOutputs; i++) {
Partitioner = getTaskConfiguration().getClass("partitioner_" + i,
DefaultPartitioner.class, ChannelSelector.class);
try {
partitioners.add(Partitioner.newInstance());
} catch (Exception e) {
}
}
UserFunction = getTaskConfiguration().getClass("userfunction",
TestTaskInvokable.class, UserTaskInvokable.class);
try {
userFunction = UserFunction.newInstance();
} catch (Exception e) {
}
}
@Override
public void registerInputOutput() {
setConfigInputs();
for (ChannelSelector<Record> partitioner : partitioners) {
outputs.add(new RecordWriter<Record>(this, Record.class, partitioner));
}
}
// TODO: Performance with multiple outputs
@Override
public void invoke() throws Exception {
boolean hasInput = true;
while (hasInput) {
hasInput = false;
for (RecordReader<Record> input : inputs) {
if (input.hasNext()) {
hasInput = true;
for (RecordWriter<Record> output : outputs) {
userFunction.invoke(input.next(), output);
}
}
}
}
}
}
package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.nephele.io.RecordReader;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class DefaultSinkInvokable implements UserSinkInvokable {
@Override
public void invoke(Record record, RecordReader<Record> input)
throws Exception {
StringValue value = new StringValue("");
record.getFieldInto(0, value);
System.out.println(value.getValue());
}
}
......@@ -2,18 +2,10 @@ package eu.stratosphere.streaming.api.invokable;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
public class DefaultSourceInvokable implements UserSourceInvokable {
private String motto = "Stratosphere -- Big Data looks tiny from here";
private String[] mottoArray = motto.split(" ");
@Override
public void invoke(RecordWriter<Record> output) throws Exception {
for (CharSequence word : mottoArray) {
output.emit(new Record(new StringValue(word)));
}
}
@Override
public void invoke(RecordWriter<Record> output) throws Exception {}
}
......@@ -5,10 +5,7 @@ import eu.stratosphere.types.Record;
public class DefaultTaskInvokable implements UserTaskInvokable {
@Override
public void invoke(Record record, RecordWriter<Record> output)
throws Exception {
output.emit(record);
}
@Override
public void invoke(Record record, RecordWriter<Record> output) throws Exception {}
}
package eu.stratosphere.streaming.test.cellinfo;
package eu.stratosphere.streaming.cellinfo;
public interface IWorkerEngine {
public int get(long timeStamp, long lastMillis, int cellId);
......
package eu.stratosphere.streaming.test.cellinfo;
package eu.stratosphere.streaming.cellinfo;
public class Util {
public static int mod(int x, int y) {
......
package eu.stratosphere.streaming.test.cellinfo;
package eu.stratosphere.streaming.cellinfo;
import eu.stratosphere.streaming.test.cellinfo.Util;
import eu.stratosphere.streaming.cellinfo.Util;
public class WorkerEngineBin implements java.io.Serializable, IWorkerEngine {
private long unitLength_;
......
......@@ -7,19 +7,21 @@ import eu.stratosphere.streaming.api.JobGraphBuilder.Partitioning;
import eu.stratosphere.test.util.TestBase2;
public class MyStream extends TestBase2 {
@Override
public JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("infoSource", TestSourceInvokable.class, Partitioning.BROADCAST);
graphBuilder.setSource("querySource", QuerySourceInvokable.class, Partitioning.BROADCAST);
graphBuilder.setTask("cellTask", TestTaskInvokable.class, Partitioning.BROADCAST, 2);
graphBuilder.setSource("infoSource", TestSourceInvokable.class);
graphBuilder.setSource("querySource", QuerySourceInvokable.class);
graphBuilder.setTask("cellTask", TestTaskInvokable.class, 2);
graphBuilder.setSink("sink", TestSinkInvokable.class);
graphBuilder.connect("infoSource", "cellTask", Partitioning.BROADCAST, ChannelType.INMEMORY);
graphBuilder.connect("querySource", "cellTask", Partitioning.BROADCAST, ChannelType.INMEMORY);
graphBuilder.connect("cellTask", "sink", Partitioning.BROADCAST, ChannelType.INMEMORY);
graphBuilder.connect("infoSource", "cellTask", Partitioning.BROADCAST,
ChannelType.INMEMORY);
graphBuilder.connect("querySource", "cellTask", Partitioning.BROADCAST,
ChannelType.INMEMORY);
graphBuilder.connect("cellTask", "sink", Partitioning.BROADCAST,
ChannelType.INMEMORY);
return graphBuilder.getJobGraph();
}
......
......@@ -2,7 +2,7 @@ package eu.stratosphere.streaming.test;
import eu.stratosphere.nephele.io.RecordWriter;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
import eu.stratosphere.streaming.test.cellinfo.WorkerEngineExact;
import eu.stratosphere.streaming.cellinfo.WorkerEngineExact;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册