提交 eaf7febb 编写于 作者: G gyfora 提交者: Stephan Ewen

[streaming] jobgraphbuilder update

上级 7ed31799
......@@ -56,6 +56,8 @@ public class JobGraphBuilder {
private Map<String, AbstractJobVertex> components;
private Map<String, Integer> numberOfInstances;
private Map<String, List<Integer>> numberOfOutputChannels;
private String maxParallelismVertexName;
private int maxParallelism;
/**
* Creates a new JobGraph with the given name
......@@ -68,6 +70,8 @@ public class JobGraphBuilder {
components = new HashMap<String, AbstractJobVertex>();
numberOfInstances = new HashMap<String, Integer>();
numberOfOutputChannels = new HashMap<String, List<Integer>>();
maxParallelismVertexName = "";
maxParallelism = 0;
log.debug("JobGraph created");
}
......@@ -81,7 +85,7 @@ public class JobGraphBuilder {
*/
public void setSource(String sourceName,
final Class<? extends UserSourceInvokable> InvokableClass) {
setSource(sourceName, InvokableClass, 1);
setSource(sourceName, InvokableClass, 1, 1);
}
/**
......@@ -95,11 +99,11 @@ public class JobGraphBuilder {
* Number of task instances of this type to run in parallel
*/
public void setSource(String sourceName,
final Class<? extends UserSourceInvokable> InvokableClass,
int parallelism) {
final Class<? extends UserSourceInvokable> InvokableClass, int parallelism,
int subtasksPerInstance) {
final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
source.setInputClass(StreamSource.class);
setComponent(sourceName, InvokableClass, parallelism, source);
setComponent(sourceName, InvokableClass, parallelism, subtasksPerInstance, source);
log.debug("SOURCE: " + sourceName);
}
......@@ -111,9 +115,8 @@ public class JobGraphBuilder {
* @param InvokableClass
* User defined class describing the task
*/
public void setTask(String taskName,
final Class<? extends UserTaskInvokable> InvokableClass) {
setTask(taskName, InvokableClass, 1);
public void setTask(String taskName, final Class<? extends UserTaskInvokable> InvokableClass) {
setTask(taskName, InvokableClass, 1, 1);
}
/**
......@@ -126,12 +129,11 @@ public class JobGraphBuilder {
* @param parallelism
* Number of task instances of this type to run in parallel
*/
public void setTask(String taskName,
final Class<? extends UserTaskInvokable> InvokableClass,
int parallelism) {
public void setTask(String taskName, final Class<? extends UserTaskInvokable> InvokableClass,
int parallelism, int subtasksPerInstance) {
final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
task.setTaskClass(StreamTask.class);
setComponent(taskName, InvokableClass, parallelism, task);
setComponent(taskName, InvokableClass, parallelism, subtasksPerInstance, task);
log.debug("TASK: " + taskName);
}
......@@ -143,9 +145,8 @@ public class JobGraphBuilder {
* @param InvokableClass
* User defined class describing the sink
*/
public void setSink(String sinkName,
final Class<? extends UserSinkInvokable> InvokableClass) {
setSink(sinkName, InvokableClass, 1);
public void setSink(String sinkName, final Class<? extends UserSinkInvokable> InvokableClass) {
setSink(sinkName, InvokableClass, 1, 1);
}
/**
......@@ -158,23 +159,26 @@ public class JobGraphBuilder {
* @param parallelism
* Number of task instances of this type to run in parallel
*/
public void setSink(String sinkName,
final Class<? extends UserSinkInvokable> InvokableClass,
int parallelism) {
public void setSink(String sinkName, final Class<? extends UserSinkInvokable> InvokableClass,
int parallelism, int subtasksPerInstance) {
final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
sink.setOutputClass(StreamSink.class);
setComponent(sinkName, InvokableClass, parallelism, sink);
setComponent(sinkName, InvokableClass, parallelism, subtasksPerInstance, sink);
log.debug("SINK: " + sinkName);
}
private void setComponent(String componentName,
final Class<? extends UserInvokable> InvokableClass,
int parallelism, AbstractJobVertex component) {
final Class<? extends UserInvokable> InvokableClass, int parallelism,
int subtasksPerInstance, AbstractJobVertex component) {
component.setNumberOfSubtasks(parallelism);
component.setNumberOfSubtasksPerInstance(parallelism);
component.setNumberOfSubtasksPerInstance(subtasksPerInstance);
Configuration config = new TaskConfig(component.getConfiguration())
.getConfiguration();
if (parallelism > maxParallelism) {
maxParallelism = parallelism;
maxParallelismVertexName = componentName;
}
Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
config.setClass("userfunction", InvokableClass);
config.setString("componentName", componentName);
components.put(componentName, component);
......@@ -195,36 +199,39 @@ public class JobGraphBuilder {
* @param channelType
* Channel Type
*/
private void connect(String upStreamComponentName,
String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord>> PartitionerClass,
ChannelType channelType) {
private void connect(String upStreamComponentName, String downStreamComponentName,
Class<? extends ChannelSelector<StreamRecord>> PartitionerClass) {
AbstractJobVertex upStreamComponent = components
.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components
.get(downStreamComponentName);
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
try {
upStreamComponent.connectTo(downStreamComponent, channelType);
Configuration config = new TaskConfig(
upStreamComponent.getConfiguration()).getConfiguration();
upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK);
Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
.getConfiguration();
config.setClass(
"partitionerClass_"
+ (upStreamComponent
.getNumberOfForwardConnections() - 1),
"partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
PartitionerClass);
log.debug("CONNECTED: " + PartitionerClass.getSimpleName() + " - "
+ upStreamComponentName + " -> " + downStreamComponentName);
} catch (JobGraphDefinitionException e) {
log.error(
"Cannot connect components with "
+ PartitionerClass.getSimpleName() + " : "
+ upStreamComponentName + " -> "
+ downStreamComponentName, e);
log.error("Cannot connect components with " + PartitionerClass.getSimpleName() + " : "
+ upStreamComponentName + " -> " + downStreamComponentName, e);
}
}
public void setInstanceSharing() {
AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);
for (String componentName : components.keySet()) {
if (componentName != maxParallelismVertexName) {
components.get(componentName).setVertexToShareInstancesWith(maxParallelismVertex);
}
}
}
/**
* Connects two components with the given names by broadcast partitioning.
* <p>
......@@ -237,12 +244,9 @@ public class JobGraphBuilder {
* Name of the downstream component, that will receive the
* records
*/
public void broadcastConnect(String upStreamComponentName,
String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName,
BroadcastPartitioner.class, ChannelType.INMEMORY);
addOutputChannels(upStreamComponentName,
numberOfInstances.get(downStreamComponentName));
public void broadcastConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, BroadcastPartitioner.class);
addOutputChannels(upStreamComponentName, numberOfInstances.get(downStreamComponentName));
}
/**
......@@ -260,41 +264,32 @@ public class JobGraphBuilder {
* @param keyPosition
* Position of key in the tuple
*/
public void fieldsConnect(String upStreamComponentName,
String downStreamComponentName, int keyPosition) {
public void fieldsConnect(String upStreamComponentName, String downStreamComponentName,
int keyPosition) {
AbstractJobVertex upStreamComponent = components
.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components
.get(downStreamComponentName);
AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
try {
upStreamComponent.connectTo(downStreamComponent,
ChannelType.INMEMORY);
upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK);
Configuration config = new TaskConfig(
upStreamComponent.getConfiguration()).getConfiguration();
Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
.getConfiguration();
config.setClass(
"partitionerClass_"
+ (upStreamComponent
.getNumberOfForwardConnections() - 1),
"partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
FieldsPartitioner.class);
config.setInteger(
"partitionerIntParam_"
+ (upStreamComponent
.getNumberOfForwardConnections() - 1),
keyPosition);
+ (upStreamComponent.getNumberOfForwardConnections() - 1), keyPosition);
addOutputChannels(upStreamComponentName, 1);
log.debug("CONNECTED: FIELD PARTITIONING - "
+ upStreamComponentName + " -> " + downStreamComponentName
+ ", KEY: " + keyPosition);
log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName + " -> "
+ downStreamComponentName + ", KEY: " + keyPosition);
} catch (JobGraphDefinitionException e) {
log.error("Cannot connect components by field: "
+ upStreamComponentName + " to " + downStreamComponentName,
e);
log.error("Cannot connect components by field: " + upStreamComponentName + " to "
+ downStreamComponentName, e);
}
}
......@@ -310,10 +305,8 @@ public class JobGraphBuilder {
* Name of the downstream component, that will receive the
* records
*/
public void globalConnect(String upStreamComponentName,
String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName,
GlobalPartitioner.class, ChannelType.INMEMORY);
public void globalConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class);
addOutputChannels(upStreamComponentName, 1);
}
......@@ -329,23 +322,17 @@ public class JobGraphBuilder {
* Name of the downstream component, that will receive the
* records
*/
public void shuffleConnect(String upStreamComponentName,
String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName,
ShufflePartitioner.class, ChannelType.INMEMORY);
public void shuffleConnect(String upStreamComponentName, String downStreamComponentName) {
connect(upStreamComponentName, downStreamComponentName, ShufflePartitioner.class);
addOutputChannels(upStreamComponentName, 1);
}
private void addOutputChannels(String upStreamComponentName,
int numOfInstances) {
private void addOutputChannels(String upStreamComponentName, int numOfInstances) {
if (numberOfOutputChannels.containsKey(upStreamComponentName)) {
numberOfOutputChannels.get(upStreamComponentName).add(
numOfInstances);
numberOfOutputChannels.get(upStreamComponentName).add(numOfInstances);
} else {
numberOfOutputChannels.put(upStreamComponentName,
new ArrayList<Integer>());
numberOfOutputChannels.get(upStreamComponentName).add(
numOfInstances);
numberOfOutputChannels.put(upStreamComponentName, new ArrayList<Integer>());
numberOfOutputChannels.get(upStreamComponentName).add(numOfInstances);
}
}
......@@ -364,8 +351,7 @@ public class JobGraphBuilder {
for (String component : numberOfOutputChannels.keySet()) {
Configuration config = components.get(component).getConfiguration();
List<Integer> channelNumList = numberOfOutputChannels
.get(component);
List<Integer> channelNumList = numberOfOutputChannels.get(component);
for (int i = 0; i < channelNumList.size(); i++) {
config.setInteger("channels_" + i, channelNumList.get(i));
}
......@@ -376,6 +362,7 @@ public class JobGraphBuilder {
* @return The JobGraph object
*/
public JobGraph getJobGraph() {
setInstanceSharing();
setNumberOfJobInputs();
setNumberOfJobOutputs();
return jobGraph;
......
......@@ -57,8 +57,7 @@ public class BatchForwardLocal {
exec.start();
Client client = new Client(new InetSocketAddress("localhost",
6498), configuration);
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
......@@ -67,8 +66,8 @@ public class BatchForwardLocal {
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress(
"hadoop02.ilab.sztaki.hu", 6123), configuration);
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
......
......@@ -31,8 +31,8 @@ public class BatchWordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("BatchWordCountSource", BatchWordCountSource.class);
graphBuilder.setTask("BatchWordCountSplitter", BatchWordCountSplitter.class, 2);
graphBuilder.setTask("BatchWordCountCounter", BatchWordCountCounter.class, 2);
graphBuilder.setTask("BatchWordCountSplitter", BatchWordCountSplitter.class, 2, 1);
graphBuilder.setTask("BatchWordCountCounter", BatchWordCountCounter.class, 2, 1);
graphBuilder.setSink("BatchWordCountSink", BatchWordCountSink.class);
graphBuilder.shuffleConnect("BatchWordCountSource", "BatchWordCountSplitter");
......
......@@ -32,7 +32,7 @@ public class CellInfoLocal {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("infoSource", InfoSource.class);
graphBuilder.setSource("querySource", QuerySource.class);
graphBuilder.setTask("cellTask", CellTask.class, 3);
graphBuilder.setTask("cellTask", CellTask.class, 3, 1);
graphBuilder.setSink("sink", CellSink.class);
graphBuilder.fieldsConnect("infoSource", "cellTask", 0);
......
......@@ -32,13 +32,13 @@ public class WindowWordCountLocal {
public static JobGraph getJobGraph() {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WindowWordCountSource", WindowWordCountSource.class);
graphBuilder.setTask("WindowWordCountSplitter", WindowWordCountSplitter.class, 1);
graphBuilder.setTask("WindowWordCountCounter", WindowWordCountCounter.class, 1);
graphBuilder.setTask("WindowWordCountSplitter", WindowWordCountSplitter.class, 1, 1);
graphBuilder.setTask("WindowWordCountCounter", WindowWordCountCounter.class, 1, 1);
graphBuilder.setSink("WindowWordCountSink", WindowWordCountSink.class);
graphBuilder.broadcastConnect("WindowWordCountSource", "WindowWordCountSplitter");
graphBuilder.shuffleConnect("WindowWordCountSource", "WindowWordCountSplitter");
graphBuilder.fieldsConnect("WindowWordCountSplitter", "WindowWordCountCounter", 0);
graphBuilder.broadcastConnect("WindowWordCountCounter", "WindowWordCountSink");
graphBuilder.shuffleConnect("WindowWordCountCounter", "WindowWordCountSink");
return graphBuilder.getJobGraph();
}
......
......@@ -32,7 +32,7 @@ public class WordCountDummySource extends UserSourceInvokable {
if (i % 2 == 0) {
record.setString(0, "Gyula Marci");
} else {
record.setString(0, "Gabor Gyula");
record.setString(0, "Gabor Frank");
}
emit(record);
}
......
......@@ -31,7 +31,7 @@ public class WordCountDummySource2 extends UserSourceInvokable {
@Override
public void invoke() throws Exception {
for (int i = 0; i < 1000000; i++) {
for (int i = 0; i < 100000; i++) {
if (i % 50000 == 0) {
time = System.currentTimeMillis();
System.out.println("Source:\t\t" + i + "\t\tTime: " + (time - prevTime));
......@@ -41,7 +41,7 @@ public class WordCountDummySource2 extends UserSourceInvokable {
if (i % 2 == 0) {
record.setString(0, "Gyula Marci");
} else {
record.setString(0, "Gabor Gyula");
record.setString(0, "Gabor Frank");
}
emit(record);
}
......
......@@ -31,21 +31,21 @@ public class WordCountLocal {
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountDummySource.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2, 1);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2, 1);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
graphBuilder.fieldsConnect("WordCountSplitter", "WordCountCounter", 0);
graphBuilder.shuffleConnect("WordCountCounter", "WordCountSink");
return graphBuilder.getJobGraph();
}
//TODO: arguments check
// TODO: arguments check
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
LogUtils.initializeDefaultConsoleLogger(Level.ERROR, Level.INFO);
try {
JobGraph jG = getJobGraph();
......@@ -62,14 +62,15 @@ public class WordCountLocal {
exec.start();
Client client = new Client(new InetSocketAddress("localhost", 6498), configuration);
client.run(jG, true);
exec.stop();
} else if (args[0].equals("cluster")) {
System.out.println("Running in Cluster2 mode");
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123), configuration);
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
}
......
......@@ -33,8 +33,8 @@ public class WordCountRemote {
private static JobGraph getJobGraph() throws Exception {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("WordCountSource", WordCountDummySource2.class);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2);
graphBuilder.setTask("WordCountSplitter", WordCountSplitter.class, 2, 2);
graphBuilder.setTask("WordCountCounter", WordCountCounter.class, 2, 1);
graphBuilder.setSink("WordCountSink", WordCountSink.class);
graphBuilder.shuffleConnect("WordCountSource", "WordCountSplitter");
......@@ -45,7 +45,7 @@ public class WordCountRemote {
}
public static void main(String[] args) {
LogUtils.initializeDefaultConsoleLogger(Level.DEBUG, Level.INFO);
LogUtils.initializeDefaultConsoleLogger(Level.ERROR, Level.INFO);
try {
File file = new File("target/stratosphere-streaming-0.5-SNAPSHOT.jar");
......@@ -56,7 +56,8 @@ public class WordCountRemote {
jG.addJar(new Path(file.getAbsolutePath()));
Configuration configuration = jG.getJobConfiguration();
Client client = new Client(new InetSocketAddress("hadoop02.ilab.sztaki.hu", 6123), configuration);
Client client = new Client(new InetSocketAddress("hadoop00.ilab.sztaki.hu", 6123),
configuration);
client.run(jG, true);
} catch (Exception e) {
System.out.println(e);
......
......@@ -49,6 +49,7 @@ public class StreamComponentTest {
public static class MySource extends UserSourceInvokable {
public MySource() {
}
StreamRecord record = new StreamRecord(new Tuple1<Integer>());
@Override
......@@ -98,7 +99,7 @@ public class StreamComponentTest {
JobGraphBuilder graphBuilder = new JobGraphBuilder("testGraph");
graphBuilder.setSource("MySource", MySource.class);
graphBuilder.setTask("MyTask", MyTask.class, 2);
graphBuilder.setTask("MyTask", MyTask.class, 2, 2);
graphBuilder.setSink("MySink", MySink.class);
graphBuilder.shuffleConnect("MySource", "MyTask");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册