JobGraphBuilder.java 14.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/***********************************************************************************************************************
 *
 * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
 * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
 * specific language governing permissions and limitations under the License.
 *
 **********************************************************************************************************************/

16
package eu.stratosphere.streaming.api;
G
Gyula Fora 已提交
17

18
import java.util.ArrayList;
G
Gyula Fora 已提交
19
import java.util.HashMap;
20
import java.util.List;
G
Gyula Fora 已提交
21 22
import java.util.Map;

G
gaborhermann 已提交
23 24 25
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

26 27
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.ChannelSelector;
G
Gyula Fora 已提交
28
import eu.stratosphere.nephele.io.channels.ChannelType;
29
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
G
Gyula Fora 已提交
30 31 32 33 34
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
35
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
36
import eu.stratosphere.streaming.api.invokable.UserInvokable;
37 38 39
import eu.stratosphere.streaming.api.invokable.UserSinkInvokable;
import eu.stratosphere.streaming.api.invokable.UserSourceInvokable;
import eu.stratosphere.streaming.api.invokable.UserTaskInvokable;
40 41 42
import eu.stratosphere.streaming.api.streamcomponent.StreamSink;
import eu.stratosphere.streaming.api.streamcomponent.StreamSource;
import eu.stratosphere.streaming.api.streamcomponent.StreamTask;
43
import eu.stratosphere.streaming.api.streamrecord.StreamRecord;
44 45 46 47
import eu.stratosphere.streaming.partitioner.BroadcastPartitioner;
import eu.stratosphere.streaming.partitioner.FieldsPartitioner;
import eu.stratosphere.streaming.partitioner.GlobalPartitioner;
import eu.stratosphere.streaming.partitioner.ShufflePartitioner;
G
Gyula Fora 已提交
48

G
Gyula Fora 已提交
49 50 51
/**
 * Object for building Stratosphere stream processing job graphs
 */
G
Gyula Fora 已提交
52
public class JobGraphBuilder {
53

G
gaborhermann 已提交
54
	private static final Log log = LogFactory.getLog(JobGraphBuilder.class);
G
Gyula Fora 已提交
55 56
	private final JobGraph jobGraph;
	private Map<String, AbstractJobVertex> components;
57
	private Map<String, Integer> numberOfInstances;
58
	private Map<String, List<Integer>> numberOfOutputChannels;
G
gyfora 已提交
59 60
	private String maxParallelismVertexName;
	private int maxParallelism;
61

G
Gyula Fora 已提交
62 63 64 65
	/**
	 * Creates a new JobGraph with the given name
	 * 
	 * @param jobGraphName
66
	 *            Name of the JobGraph
G
Gyula Fora 已提交
67 68 69 70
	 */
	public JobGraphBuilder(String jobGraphName) {
		jobGraph = new JobGraph(jobGraphName);
		components = new HashMap<String, AbstractJobVertex>();
71
		numberOfInstances = new HashMap<String, Integer>();
G
Gyula Fora 已提交
72
		numberOfOutputChannels = new HashMap<String, List<Integer>>();
G
gyfora 已提交
73 74
		maxParallelismVertexName = "";
		maxParallelism = 0;
G
gaborhermann 已提交
75
		log.debug("JobGraph created");
G
Gyula Fora 已提交
76 77 78 79 80 81
	}

	/**
	 * Adds a source component to the JobGraph
	 * 
	 * @param sourceName
82
	 *            Name of the source component
G
Gyula Fora 已提交
83
	 * @param InvokableClass
84
	 *            User defined class describing the source
G
Gyula Fora 已提交
85
	 */
86 87
	public void setSource(String sourceName,
			final Class<? extends UserSourceInvokable> InvokableClass) {
G
gyfora 已提交
88
		setSource(sourceName, InvokableClass, 1, 1);
G
Gyula Fora 已提交
89 90
	}

91 92 93 94 95 96 97 98 99
	/**
	 * Adds a source component to the JobGraph
	 * 
	 * @param sourceName
	 *            Name of the source component
	 * @param InvokableClass
	 *            User defined class describing the source
	 * @param parallelism
	 *            Number of task instances of this type to run in parallel
M
Márton Balassi 已提交
100 101
	 * @param subtasksPerInstance
	 *            Number of subtasks allocated to a machine
102
	 */
103
	public void setSource(String sourceName,
G
gyfora 已提交
104 105
			final Class<? extends UserSourceInvokable> InvokableClass, int parallelism,
			int subtasksPerInstance) {
106 107
		final JobInputVertex source = new JobInputVertex(sourceName, jobGraph);
		source.setInputClass(StreamSource.class);
G
gyfora 已提交
108
		setComponent(sourceName, InvokableClass, parallelism, subtasksPerInstance, source);
G
gaborhermann 已提交
109
		log.debug("SOURCE: " + sourceName);
110 111
	}

G
Gyula Fora 已提交
112 113 114 115
	/**
	 * Adds a task component to the JobGraph
	 * 
	 * @param taskName
116
	 *            Name of the task component
G
Gyula Fora 已提交
117
	 * @param InvokableClass
118 119
	 *            User defined class describing the task
	 */
G
gyfora 已提交
120 121
	public void setTask(String taskName, final Class<? extends UserTaskInvokable> InvokableClass) {
		setTask(taskName, InvokableClass, 1, 1);
122 123 124 125 126 127 128 129 130
	}

	/**
	 * Adds a task component to the JobGraph
	 * 
	 * @param taskName
	 *            Name of the task component
	 * @param InvokableClass
	 *            User defined class describing the task
G
Gyula Fora 已提交
131
	 * @param parallelism
132
	 *            Number of task instances of this type to run in parallel
M
Márton Balassi 已提交
133 134
	 * @param subtasksPerInstance
	 *            Number of subtasks allocated to a machine
G
Gyula Fora 已提交
135
	 */
G
gyfora 已提交
136 137
	public void setTask(String taskName, final Class<? extends UserTaskInvokable> InvokableClass,
			int parallelism, int subtasksPerInstance) {
G
Gyula Fora 已提交
138 139
		final JobTaskVertex task = new JobTaskVertex(taskName, jobGraph);
		task.setTaskClass(StreamTask.class);
G
gyfora 已提交
140
		setComponent(taskName, InvokableClass, parallelism, subtasksPerInstance, task);
G
gaborhermann 已提交
141
		log.debug("TASK: " + taskName);
G
Gyula Fora 已提交
142 143 144 145 146 147
	}

	/**
	 * Adds a sink component to the JobGraph
	 * 
	 * @param sinkName
148
	 *            Name of the sink component
G
Gyula Fora 已提交
149
	 * @param InvokableClass
150
	 *            User defined class describing the sink
G
Gyula Fora 已提交
151
	 */
G
gyfora 已提交
152 153
	public void setSink(String sinkName, final Class<? extends UserSinkInvokable> InvokableClass) {
		setSink(sinkName, InvokableClass, 1, 1);
G
Gyula Fora 已提交
154 155
	}

156 157 158 159 160 161 162 163 164
	/**
	 * Adds a sink component to the JobGraph
	 * 
	 * @param sinkName
	 *            Name of the sink component
	 * @param InvokableClass
	 *            User defined class describing the sink
	 * @param parallelism
	 *            Number of task instances of this type to run in parallel
M
Márton Balassi 已提交
165 166
	 * @param subtasksPerInstance
	 *            Number of subtasks allocated to a machine
167
	 */
G
gyfora 已提交
168 169
	public void setSink(String sinkName, final Class<? extends UserSinkInvokable> InvokableClass,
			int parallelism, int subtasksPerInstance) {
170 171
		final JobOutputVertex sink = new JobOutputVertex(sinkName, jobGraph);
		sink.setOutputClass(StreamSink.class);
G
gyfora 已提交
172
		setComponent(sinkName, InvokableClass, parallelism, subtasksPerInstance, sink);
173
		log.debug("SINK: " + sinkName);
174
	}
175

176
	private void setComponent(String componentName,
G
gyfora 已提交
177 178
			final Class<? extends UserInvokable> InvokableClass, int parallelism,
			int subtasksPerInstance, AbstractJobVertex component) {
179
		component.setNumberOfSubtasks(parallelism);
G
gyfora 已提交
180
		component.setNumberOfSubtasksPerInstance(subtasksPerInstance);
181

G
gyfora 已提交
182 183 184 185 186 187
		if (parallelism > maxParallelism) {
			maxParallelism = parallelism;
			maxParallelismVertexName = componentName;
		}

		Configuration config = new TaskConfig(component.getConfiguration()).getConfiguration();
188 189 190 191 192
		config.setClass("userfunction", InvokableClass);
		config.setString("componentName", componentName);
		components.put(componentName, component);
		numberOfInstances.put(componentName, parallelism);
	}
193

G
Gyula Fora 已提交
194 195 196 197 198
	/**
	 * Connects to JobGraph components with the given names, partitioning and
	 * channel type
	 * 
	 * @param upStreamComponentName
199
	 *            Name of the upstream component, that will emit the records
G
Gyula Fora 已提交
200
	 * @param downStreamComponentName
201 202
	 *            Name of the downstream component, that will receive the
	 *            records
G
Gyula Fora 已提交
203
	 * @param PartitionerClass
204
	 *            Class of the partitioner
G
Gyula Fora 已提交
205
	 * @param channelType
206
	 *            Channel Type
G
Gyula Fora 已提交
207
	 */
G
gyfora 已提交
208 209
	private void connect(String upStreamComponentName, String downStreamComponentName,
			Class<? extends ChannelSelector<StreamRecord>> PartitionerClass) {
G
Gyula Fora 已提交
210

G
gyfora 已提交
211 212
		AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
		AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
G
Gyula Fora 已提交
213 214

		try {
G
gyfora 已提交
215 216 217
			upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK);
			Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
					.getConfiguration();
218
			config.setClass(
G
gyfora 已提交
219
					"partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
G
Gyula Fora 已提交
220
					PartitionerClass);
221 222
			log.debug("CONNECTED: " + PartitionerClass.getSimpleName() + " - "
					+ upStreamComponentName + " -> " + downStreamComponentName);
G
Gyula Fora 已提交
223
		} catch (JobGraphDefinitionException e) {
G
gyfora 已提交
224 225
			log.error("Cannot connect components with " + PartitionerClass.getSimpleName() + " : "
					+ upStreamComponentName + " -> " + downStreamComponentName, e);
G
Gyula Fora 已提交
226 227 228
		}
	}

229 230 231
	public void setInstanceSharing(String component1, String component2) {
		AbstractJobVertex c1 = components.get(component1);
		AbstractJobVertex c2 = components.get(component2);
M
Márton Balassi 已提交
232

233
		c1.setVertexToShareInstancesWith(c2);
234 235 236
	}

	public void setAutomaticInstanceSharing() {
G
gyfora 已提交
237 238 239 240 241 242 243 244 245 246 247

		AbstractJobVertex maxParallelismVertex = components.get(maxParallelismVertexName);

		for (String componentName : components.keySet()) {
			if (componentName != maxParallelismVertexName) {
				components.get(componentName).setVertexToShareInstancesWith(maxParallelismVertex);
			}
		}

	}

G
Gyula Fora 已提交
248 249 250 251 252 253 254
	/**
	 * Connects two components with the given names by broadcast partitioning.
	 * <p>
	 * Broadcast partitioning: All the emmitted tuples are replicated to all of
	 * the output instances
	 * 
	 * @param upStreamComponentName
255
	 *            Name of the upstream component, that will emit the records
G
Gyula Fora 已提交
256
	 * @param downStreamComponentName
257 258
	 *            Name of the downstream component, that will receive the
	 *            records
G
Gyula Fora 已提交
259
	 */
G
gyfora 已提交
260 261 262
	public void broadcastConnect(String upStreamComponentName, String downStreamComponentName) {
		connect(upStreamComponentName, downStreamComponentName, BroadcastPartitioner.class);
		addOutputChannels(upStreamComponentName, numberOfInstances.get(downStreamComponentName));
G
Gyula Fora 已提交
263 264 265
	}

	/**
266 267
	 * Connects two components with the given names by fields partitioning on
	 * the given field.
G
Gyula Fora 已提交
268 269 270 271 272
	 * <p>
	 * Fields partitioning: Tuples are hashed by the given key, and grouped to
	 * outputs accordingly
	 * 
	 * @param upStreamComponentName
273
	 *            Name of the upstream component, that will emit the records
G
Gyula Fora 已提交
274
	 * @param downStreamComponentName
275 276
	 *            Name of the downstream component, that will receive the
	 *            records
G
Gyula Fora 已提交
277
	 * @param keyPosition
278
	 *            Position of key in the tuple
G
Gyula Fora 已提交
279
	 */
G
gyfora 已提交
280 281
	public void fieldsConnect(String upStreamComponentName, String downStreamComponentName,
			int keyPosition) {
G
Gyula Fora 已提交
282

G
gyfora 已提交
283 284
		AbstractJobVertex upStreamComponent = components.get(upStreamComponentName);
		AbstractJobVertex downStreamComponent = components.get(downStreamComponentName);
G
Gyula Fora 已提交
285 286

		try {
G
gyfora 已提交
287
			upStreamComponent.connectTo(downStreamComponent, ChannelType.NETWORK);
G
Gyula Fora 已提交
288

G
gyfora 已提交
289 290
			Configuration config = new TaskConfig(upStreamComponent.getConfiguration())
					.getConfiguration();
G
Gyula Fora 已提交
291

292
			config.setClass(
G
gyfora 已提交
293
					"partitionerClass_" + (upStreamComponent.getNumberOfForwardConnections() - 1),
G
Gyula Fora 已提交
294 295
					FieldsPartitioner.class);

296 297
			config.setInteger(
					"partitionerIntParam_"
G
gyfora 已提交
298
							+ (upStreamComponent.getNumberOfForwardConnections() - 1), keyPosition);
G
Gyula Fora 已提交
299

G
Gyula Fora 已提交
300
			addOutputChannels(upStreamComponentName, 1);
G
gyfora 已提交
301 302
			log.debug("CONNECTED: FIELD PARTITIONING - " + upStreamComponentName + " -> "
					+ downStreamComponentName + ", KEY: " + keyPosition);
G
Gyula Fora 已提交
303
		} catch (JobGraphDefinitionException e) {
G
gyfora 已提交
304 305
			log.error("Cannot connect components by field: " + upStreamComponentName + " to "
					+ downStreamComponentName, e);
G
Gyula Fora 已提交
306 307 308 309 310 311
		}
	}

	/**
	 * Connects two components with the given names by global partitioning.
	 * <p>
312 313
	 * Global partitioning: sends all emitted records to one output instance
	 * (i.e. the first one)
G
Gyula Fora 已提交
314 315
	 * 
	 * @param upStreamComponentName
316
	 *            Name of the upstream component, that will emit the records
G
Gyula Fora 已提交
317
	 * @param downStreamComponentName
318 319
	 *            Name of the downstream component, that will receive the
	 *            records
G
Gyula Fora 已提交
320
	 */
G
gyfora 已提交
321 322
	public void globalConnect(String upStreamComponentName, String downStreamComponentName) {
		connect(upStreamComponentName, downStreamComponentName, GlobalPartitioner.class);
G
Gyula Fora 已提交
323
		addOutputChannels(upStreamComponentName, 1);
G
Gyula Fora 已提交
324 325 326 327 328 329 330 331 332
	}

	/**
	 * Connects two components with the given names by shuffle partitioning.
	 * <p>
	 * Shuffle partitioning: sends the output records to a randomly selected
	 * channel
	 * 
	 * @param upStreamComponentName
333
	 *            Name of the upstream component, that will emit the records
G
Gyula Fora 已提交
334
	 * @param downStreamComponentName
335 336
	 *            Name of the downstream component, that will receive the
	 *            records
G
Gyula Fora 已提交
337
	 */
G
gyfora 已提交
338 339
	public void shuffleConnect(String upStreamComponentName, String downStreamComponentName) {
		connect(upStreamComponentName, downStreamComponentName, ShufflePartitioner.class);
G
Gyula Fora 已提交
340
		addOutputChannels(upStreamComponentName, 1);
341 342
	}

G
gyfora 已提交
343
	private void addOutputChannels(String upStreamComponentName, int numOfInstances) {
344
		if (numberOfOutputChannels.containsKey(upStreamComponentName)) {
G
gyfora 已提交
345
			numberOfOutputChannels.get(upStreamComponentName).add(numOfInstances);
346
		} else {
G
gyfora 已提交
347 348
			numberOfOutputChannels.put(upStreamComponentName, new ArrayList<Integer>());
			numberOfOutputChannels.get(upStreamComponentName).add(numOfInstances);
349
		}
G
Gyula Fora 已提交
350 351 352 353
	}

	private void setNumberOfJobInputs() {
		for (AbstractJobVertex component : components.values()) {
354 355
			component.getConfiguration().setInteger("numberOfInputs",
					component.getNumberOfBackwardConnections());
G
Gyula Fora 已提交
356 357 358 359 360
		}
	}

	private void setNumberOfJobOutputs() {
		for (AbstractJobVertex component : components.values()) {
361 362
			component.getConfiguration().setInteger("numberOfOutputs",
					component.getNumberOfForwardConnections());
G
Gyula Fora 已提交
363
		}
G
Gyula Fora 已提交
364

365
		for (String component : numberOfOutputChannels.keySet()) {
366
			Configuration config = components.get(component).getConfiguration();
G
gyfora 已提交
367
			List<Integer> channelNumList = numberOfOutputChannels.get(component);
G
Gyula Fora 已提交
368 369
			for (int i = 0; i < channelNumList.size(); i++) {
				config.setInteger("channels_" + i, channelNumList.get(i));
370
			}
371
		}
G
Gyula Fora 已提交
372 373 374 375 376 377
	}

	/**
	 * @return The JobGraph object
	 */
	public JobGraph getJobGraph() {
378
		setAutomaticInstanceSharing();
G
Gyula Fora 已提交
379 380 381 382
		setNumberOfJobInputs();
		setNumberOfJobOutputs();
		return jobGraph;
	}
G
Gyula Fora 已提交
383
}