提交 bb01ea01 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] JobGraphBuilder documentation update

上级 3b5ec860
......@@ -45,7 +45,6 @@ import eu.stratosphere.streaming.partitioner.BroadcastPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.streaming.partitioner.GlobalPartitioner;
import eu.stratosphere.streaming.partitioner.ShufflePartitioner;
import eu.stratosphere.types.Key;
/**
* Object for building Stratosphere stream processing job graphs
......@@ -80,7 +79,8 @@ public class JobGraphBuilder {
* @param InvokableClass
* User defined class describing the source
*/
public void setSource(String sourceName, final Class<? extends UserSourceInvokable> InvokableClass) {
public void setSource(String sourceName,
final Class<? extends UserSourceInvokable> InvokableClass) {
setSource(sourceName, InvokableClass, 1);
}
......@@ -94,7 +94,9 @@ public class JobGraphBuilder {
* @param parallelism
* Number of task instances of this type to run in parallel
*/
public void setSource(String sourceName, final Class<? extends UserSourceInvokable> InvokableClass, int parallelism) {
public void setSource(String sourceName,
final Class<? extends UserSourceInvokable> InvokableClass,
int parallelism) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
setComponent(sourceName, InvokableClass, parallelism, source);
......@@ -109,7 +111,8 @@ public class JobGraphBuilder {
* @param InvokableClass
* User defined class describing the task
*/
public void setTask(String taskName, final Class<? extends UserTaskInvokable> InvokableClass) {
public void setTask(String taskName,
final Class<? extends UserTaskInvokable> InvokableClass) {
setTask(taskName, InvokableClass, 1);
}
......@@ -123,7 +126,9 @@ public class JobGraphBuilder {
* @param parallelism
* Number of task instances of this type to run in parallel
*/
public void setTask(String taskName, final Class<? extends UserTaskInvokable> InvokableClass, int parallelism) {
public void setTask(String taskName,
final Class<? extends UserTaskInvokable> InvokableClass,
int parallelism) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
setComponent(taskName, InvokableClass, parallelism, task);
......@@ -138,7 +143,8 @@ public class JobGraphBuilder {
* @param InvokableClass
* User defined class describing the sink
*/
public void setSink(String sinkName, final Class<? extends UserSinkInvokable> InvokableClass) {
public void setSink(String sinkName,
final Class<? extends UserSinkInvokable> InvokableClass) {
setSink(sinkName, InvokableClass, 1);
}
......@@ -152,22 +158,27 @@ public class JobGraphBuilder {
* @param parallelism
* Number of task instances of this type to run in parallel
*/
public void setSink(String sinkName, final Class<? extends UserSinkInvokable> InvokableClass, int parallelism) {
public void setSink(String sinkName,
final Class<? extends UserSinkInvokable> InvokableClass,
int parallelism) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
setComponent(sinkName, InvokableClass, parallelism, sink);
log.debug("SINK: " + sinkName);
}
private void setComponent(String componentName, final Class<? extends UserInvokable> InvokableClass, int parallelism, AbstractJobVertex component) {
private void setComponent(String componentName,
final Class<? extends UserInvokable> InvokableClass,
int parallelism, AbstractJobVertex component) {
component.setNumberOfSubtasks(parallelism);
Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
Configuration config = new TaskConfig(component.getConfiguration())
.getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", componentName);
components.put(componentName, component);
numberOfInstances.put(componentName, parallelism);
}
/**
* Connects to JobGraph components with the given names, partitioning and
* channel type
......@@ -182,22 +193,33 @@ public class JobGraphBuilder {
* @param channelType
* Channel Type
*/
private void connect(String upStreamComponentName, String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord>> PartitionerClass, ChannelType channelType) {
private void connect(String upStreamComponentName,
String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord>> PartitionerClass,
ChannelType channelType) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
AbstractJobVertex upStreamComponent = components
.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components
.get(downStreamComponentName);
try {
upStreamComponent.connectTo(downStreamComponent, channelType);
Configuration config = new TaskConfig(upStreamComponent.getConfiguration()).getConfiguration();
config.setClass("partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
Configuration config = new TaskConfig(
upStreamComponent.getConfiguration()).getConfiguration();
config.setClass(
"partitionerClass_"
+ (upStreamComponent
.getNumberOfForwardConnections() - 1),
PartitionerClass);
log.debug("CONNECTED: " + PartitionerClass.getSimpleName() + " - " + upStreamComponentName + " -> "
+ downStreamComponentName);
log.debug("CONNECTED: " + PartitionerClass.getSimpleName() + " - "
+ upStreamComponentName + " -> " + downStreamComponentName);
} catch (JobGraphDefinitionException e) {
log.error("Cannot connect components with " + PartitionerClass.getSimpleName() + " : "
+ upStreamComponentName + " -> " + downStreamComponentName, e);
log.error(
"Cannot connect components with "
+ PartitionerClass.getSimpleName() + " : "
+ upStreamComponentName + " -> "
+ downStreamComponentName, e);
}
}
......@@ -213,9 +235,12 @@ public class JobGraphBuilder {
* Name of the downstream component, that will receive the
* records
*/
public void broadcastConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, BroadcastPartitioner.class, ChannelType.INMEMORY);
addOutputChannels(upStreamComponentName, numberOfInstances.get(downStreamComponentName));
public void broadcastConnect(String upStreamComponentName,
String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName,
BroadcastPartitioner.class, ChannelType.INMEMORY);
addOutputChannels(upStreamComponentName,
numberOfInstances.get(downStreamComponentName));
}
/**
......@@ -231,33 +256,42 @@ public class JobGraphBuilder {
* Name of the downstream component, that will receive the
* records
* @param keyPosition
* Position of key in the record
* @param keyClass
* Class of the key Value stored in the record
* Position of key in the tuple
*/
//TODO: remove unused third parameter
public void fieldsConnect(String upStreamComponentName, String downStreamComponentName, int keyPosition) {
public void fieldsConnect(String upStreamComponentName,
String downStreamComponentName, int keyPosition) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
AbstractJobVertex upStreamComponent = components
.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components
.get(downStreamComponentName);
try {
upStreamComponent.connectTo(downStreamComponent, ChannelType.INMEMORY);
upStreamComponent.connectTo(downStreamComponent,
ChannelType.INMEMORY);
Configuration config = new TaskConfig(upStreamComponent.getConfiguration()).getConfiguration();
Configuration config = new TaskConfig(
upStreamComponent.getConfiguration()).getConfiguration();
config.setClass("partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
config.setClass(
"partitionerClass_"
+ (upStreamComponent
.getNumberOfForwardConnections() - 1),
FieldsPartitioner.class);
config.setInteger("partitionerIntParam_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
config.setInteger(
"partitionerIntParam_"
+ (upStreamComponent
.getNumberOfForwardConnections() - 1),
keyPosition);
addOutputChannels(upStreamComponentName, 1);
log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName + " -> " + downStreamComponentName
log.debug("CONNECTED: FIELD PARTITIONING - "
+ upStreamComponentName + " -> " + downStreamComponentName
+ ", KEY: " + keyPosition);
} catch (JobGraphDefinitionException e) {
log.error(
"Cannot connect components by field: " + upStreamComponentName + " to " + downStreamComponentName,
log.error("Cannot connect components by field: "
+ upStreamComponentName + " to " + downStreamComponentName,
e);
}
}
......@@ -274,8 +308,10 @@ public class JobGraphBuilder {
* Name of the downstream component, that will receive the
* records
*/
public void globalConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class, ChannelType.INMEMORY);
public void globalConnect(String upStreamComponentName,
String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName,
GlobalPartitioner.class, ChannelType.INMEMORY);
addOutputChannels(upStreamComponentName, 1);
}
......@@ -291,34 +327,43 @@ public class JobGraphBuilder {
* Name of the downstream component, that will receive the
* records
*/
public void shuffleConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, ShufflePartitioner.class, ChannelType.INMEMORY);
public void shuffleConnect(String upStreamComponentName,
String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName,
ShufflePartitioner.class, ChannelType.INMEMORY);
addOutputChannels(upStreamComponentName, 1);
}
private void addOutputChannels(String upStreamComponentName, int numOfInstances) {
private void addOutputChannels(String upStreamComponentName,
int numOfInstances) {
if (numberOfOutputChannels.containsKey(upStreamComponentName)) {
numberOfOutputChannels.get(upStreamComponentName).add(numOfInstances);
numberOfOutputChannels.get(upStreamComponentName).add(
numOfInstances);
} else {
numberOfOutputChannels.put(upStreamComponentName, new ArrayList<Integer>());
numberOfOutputChannels.get(upStreamComponentName).add(numOfInstances);
numberOfOutputChannels.put(upStreamComponentName,
new ArrayList<Integer>());
numberOfOutputChannels.get(upStreamComponentName).add(
numOfInstances);
}
}
private void setNumberOfJobInputs() {
for (AbstractJobVertex component : components.values()) {
component.getConfiguration().setInteger("numberOfInputs", component.getNumberOfBackwardConnections());
component.getConfiguration().setInteger("numberOfInputs",
component.getNumberOfBackwardConnections());
}
}
private void setNumberOfJobOutputs() {
for (AbstractJobVertex component : components.values()) {
component.getConfiguration().setInteger("numberOfOutputs", component.getNumberOfForwardConnections());
component.getConfiguration().setInteger("numberOfOutputs",
component.getNumberOfForwardConnections());
}
for (String component : numberOfOutputChannels.keySet()) {
Configuration config = components.get(component).getConfiguration();
List<Integer> channelNumList = numberOfOutputChannels.get(component);
List<Integer> channelNumList = numberOfOutputChannels
.get(component);
for (int i = 0; i < channelNumList.size(); i++) {
config.setInteger("channels_" + i, channelNumList.get(i));
}
......
......@@ -438,9 +438,9 @@ public class StreamRecord implements IOReadableWritable, Serializable {
* @param d
* New value
*/
public void setDouble(int tupleNumber, int fieldNumber, Double tuple) {
public void setDouble(int tupleNumber, int fieldNumber, Double d) {
try {
tupleBatch.get(tupleNumber).setField(tuple, fieldNumber);
tupleBatch.get(tupleNumber).setField(d, fieldNumber);
} catch (IndexOutOfBoundsException e) {
throw (new NoSuchTupleException());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册