提交 6e3809fb 编写于 作者: M Márton Balassi 提交者: Stephan Ewen

[streaming] Added Source & Sink parallelism

上级 a3809ee2
......@@ -49,7 +49,7 @@ import eu.stratosphere.types.Key;
*
*/
public class JobGraphBuilder {
private static final Log log = LogFactory.getLog(JobGraphBuilder.class);
private final JobGraph jobGraph;
private Map<String, AbstractJobVertex> components;
......@@ -60,7 +60,7 @@ public class JobGraphBuilder {
* Creates a new JobGraph with the given name
*
* @param jobGraphName
* Name of the JobGraph
* Name of the JobGraph
*/
public JobGraphBuilder(String jobGraphName) {
jobGraph = new JobGraph(jobGraphName);
......@@ -68,18 +68,17 @@ public class JobGraphBuilder {
numberOfInstances = new HashMap<String, Integer>();
numberOfOutputChannels = new HashMap<String, Integer>();
log.debug("JobGraph created");
}
/**
* Adds a source component to the JobGraph
*
* @param sourceName
* Name of the source component
* Name of the source component
* @param InvokableClass
* User defined class describing the source
* User defined class describing the source
*/
// TODO: Add source parallelism
public void setSource(String sourceName,
final Class<? extends UserSourceInvokable> InvokableClass) {
......@@ -94,18 +93,67 @@ public class JobGraphBuilder {
log.debug("Source set: " + sourceName);
}
/**
* Adds a source component to the JobGraph
*
* @param sourceName
* Name of the source component
* @param InvokableClass
* User defined class describing the source
* @param parallelism
* Number of task instances of this type to run in parallel
*/
public void setSource(String sourceName,
final Class<? extends UserSourceInvokable> InvokableClass,
int parallelism) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
source.setNumberOfSubtasks(parallelism);
Configuration config = new TaskConfig(source.getConfiguration())
.getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", sourceName);
components.put(sourceName, source);
numberOfInstances.put(sourceName, 1);
log.debug("Source set: " + sourceName);
}
/**
* Adds a task component to the JobGraph
*
* @param taskName
* Name of the task component
* Name of the task component
* @param InvokableClass
* User defined class describing the task
* User defined class describing the task
*/
public void setTask(String taskName,
final Class<? extends UserTaskInvokable> InvokableClass) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
Configuration config = new TaskConfig(task.getConfiguration())
.getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", taskName);
components.put(taskName, task);
numberOfInstances.put(taskName, 1);
log.debug("Task set: " + taskName);
}
/**
* Adds a task component to the JobGraph
*
* @param taskName
* Name of the task component
* @param InvokableClass
* User defined class describing the task
* @param parallelism
* Number of task instances of this type to run in parallel
* Number of task instances of this type to run in parallel
*/
public void setTask(String taskName,
final Class<? extends UserTaskInvokable> InvokableClass, int parallelism) {
final Class<? extends UserTaskInvokable> InvokableClass,
int parallelism) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
......@@ -123,9 +171,9 @@ public class JobGraphBuilder {
* Adds a sink component to the JobGraph
*
* @param sinkName
* Name of the sink component
* Name of the sink component
* @param InvokableClass
* User defined class describing the sink
* User defined class describing the sink
*/
public void setSink(String sinkName,
final Class<? extends UserSinkInvokable> InvokableClass) {
......@@ -141,25 +189,53 @@ public class JobGraphBuilder {
log.debug("Sink set: " + sinkName);
}
/**
* Adds a sink component to the JobGraph
*
* @param sinkName
* Name of the sink component
* @param InvokableClass
* User defined class describing the sink
* @param parallelism
* Number of task instances of this type to run in parallel
*/
public void setSink(String sinkName,
final Class<? extends UserSinkInvokable> InvokableClass,
int parallelism) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
sink.setNumberOfSubtasks(parallelism);
Configuration config = new TaskConfig(sink.getConfiguration())
.getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", sinkName);
components.put(sinkName, sink);
numberOfInstances.put(sinkName, 1);
log.debug("Sink set: " + sinkName);
}
/**
* Connects to JobGraph components with the given names, partitioning and
* channel type
*
* @param upStreamComponentName
* Name of the upstream component, that will emit the records
* Name of the upstream component, that will emit the records
* @param downStreamComponentName
* Name of the downstream component, that will receive the records
* Name of the downstream component, that will receive the
* records
* @param PartitionerClass
* Class of the partitioner
* Class of the partitioner
* @param channelType
* Channel Type
* Channel Type
*/
private void connect(String upStreamComponentName,
String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord>> PartitionerClass,
ChannelType channelType) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex upStreamComponent = components
.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components
.get(downStreamComponentName);
......@@ -171,9 +247,15 @@ public class JobGraphBuilder {
"partitionerClass_"
+ upStreamComponent.getNumberOfForwardConnections(),
PartitionerClass);
log.debug("Components connected with " + PartitionerClass.getSimpleName() + ": " + upStreamComponentName + " to " + downStreamComponentName);
log.debug("Components connected with "
+ PartitionerClass.getSimpleName() + ": "
+ upStreamComponentName + " to " + downStreamComponentName);
} catch (JobGraphDefinitionException e) {
log.error("Cannot connect components with " + PartitionerClass.getSimpleName() + " : " + upStreamComponentName + " to " + downStreamComponentName, e);
log.error(
"Cannot connect components with "
+ PartitionerClass.getSimpleName() + " : "
+ upStreamComponentName + " to "
+ downStreamComponentName, e);
}
}
......@@ -184,9 +266,10 @@ public class JobGraphBuilder {
* the output instances
*
* @param upStreamComponentName
* Name of the upstream component, that will emit the records
* Name of the upstream component, that will emit the records
* @param downStreamComponentName
* Name of the downstream component, that will receive the records
* Name of the downstream component, that will receive the
* records
*/
public void broadcastConnect(String upStreamComponentName,
String downStreamComponentName) {
......@@ -195,43 +278,46 @@ public class JobGraphBuilder {
BroadcastPartitioner.class, ChannelType.INMEMORY);
if (numberOfOutputChannels.containsKey(upStreamComponentName)) {
numberOfOutputChannels.put(
upStreamComponentName,
numberOfOutputChannels.put(upStreamComponentName,
numberOfOutputChannels.get(upStreamComponentName)
+ numberOfInstances.get(downStreamComponentName));
} else {
numberOfOutputChannels.put(upStreamComponentName,
numberOfInstances.get(downStreamComponentName));
}
//log.debug("Components connected with broadcast: " + upStreamComponentName + " to " + downStreamComponentName);
// log.debug("Components connected with broadcast: " +
// upStreamComponentName + " to " + downStreamComponentName);
}
/**
* Connects two components with the given names by fields partitioning on the
* given field.
* Connects two components with the given names by fields partitioning on
* the given field.
* <p>
* Fields partitioning: Tuples are hashed by the given key, and grouped to
* outputs accordingly
*
* @param upStreamComponentName
* Name of the upstream component, that will emit the records
* Name of the upstream component, that will emit the records
* @param downStreamComponentName
* Name of the downstream component, that will receive the records
* Name of the downstream component, that will receive the
* records
* @param keyPosition
* Position of key in the record
* Position of key in the record
* @param keyClass
* Class of the key Value stored in the record
* Class of the key Value stored in the record
*/
public void fieldsConnect(String upStreamComponentName,
String downStreamComponentName, int keyPosition,
Class<? extends Key> keyClass) {
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex upStreamComponent = components
.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components
.get(downStreamComponentName);
try {
upStreamComponent.connectTo(downStreamComponent, ChannelType.INMEMORY);
upStreamComponent.connectTo(downStreamComponent,
ChannelType.INMEMORY);
Configuration config = new TaskConfig(
upStreamComponent.getConfiguration()).getConfiguration();
......@@ -243,29 +329,36 @@ public class JobGraphBuilder {
config.setClass(
"partitionerClassParam_"
+ upStreamComponent.getNumberOfForwardConnections(), keyClass);
+ upStreamComponent.getNumberOfForwardConnections(),
keyClass);
config.setInteger(
"partitionerIntParam_"
+ upStreamComponent.getNumberOfForwardConnections(), keyPosition);
+ upStreamComponent.getNumberOfForwardConnections(),
keyPosition);
addOutputChannels(upStreamComponentName);
log.debug("Components connected by field: " + upStreamComponentName + " to " + downStreamComponentName + " by key position " + keyPosition);
log.debug("Components connected by field: " + upStreamComponentName
+ " to " + downStreamComponentName + " by key position "
+ keyPosition);
} catch (JobGraphDefinitionException e) {
log.error("Cannot connect components by field: " + upStreamComponentName + " to " + downStreamComponentName, e);
log.error("Cannot connect components by field: "
+ upStreamComponentName + " to " + downStreamComponentName,
e);
}
}
/**
* Connects two components with the given names by global partitioning.
* <p>
* Global partitioning: sends all emitted records to one output instance (i.e.
* the first one)
* Global partitioning: sends all emitted records to one output instance
* (i.e. the first one)
*
* @param upStreamComponentName
* Name of the upstream component, that will emit the records
* Name of the upstream component, that will emit the records
* @param downStreamComponentName
* Name of the downstream component, that will receive the records
* Name of the downstream component, that will receive the
* records
*/
public void globalConnect(String upStreamComponentName,
String downStreamComponentName) {
......@@ -284,9 +377,10 @@ public class JobGraphBuilder {
* channel
*
* @param upStreamComponentName
* Name of the upstream component, that will emit the records
* Name of the upstream component, that will emit the records
* @param downStreamComponentName
* Name of the downstream component, that will receive the records
* Name of the downstream component, that will receive the
* records
*/
public void shuffleConnect(String upStreamComponentName,
String downStreamComponentName) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册