提交 921278e3 编写于 作者: U uce 提交者: StephanEwen

Removed Nephele plugins

上级 a33e1a39
......@@ -26,13 +26,10 @@ import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileRequest
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheProfileResponse;
import eu.stratosphere.nephele.execution.librarycache.LibraryCacheUpdate;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.io.IOReadableWritable;
import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.ipc.RPC;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.net.NetUtils;
import eu.stratosphere.nephele.plugins.PluginID;
import eu.stratosphere.nephele.protocols.PluginCommunicationProtocol;
import eu.stratosphere.nephele.protocols.TaskOperationProtocol;
import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
import eu.stratosphere.nephele.taskmanager.TaskKillResult;
......@@ -67,11 +64,6 @@ public abstract class AbstractInstance extends NetworkNode {
*/
private TaskOperationProtocol taskManager = null;
/**
* Stores the RPC stub object for the instance's task manager plugin component.
*/
private PluginCommunicationProtocol taskManagerPluginComponent = null;
/**
* Constructs an abstract instance object.
*
......@@ -125,36 +117,6 @@ public abstract class AbstractInstance extends NetworkNode {
}
}
/**
* Creates or returns the RPC stub object for the instance's task manager plugin component.
*
* @return the RPC stub object for the instance's task manager plugin component
* @throws IOException
* thrown if the RPC stub object for the task manager plugin component cannot be created
*/
private PluginCommunicationProtocol getTaskManagerPluginProxy() throws IOException {
if (this.taskManagerPluginComponent == null) {
this.taskManagerPluginComponent = RPC.getProxy(PluginCommunicationProtocol.class,
new InetSocketAddress(getInstanceConnectionInfo().getAddress(),
getInstanceConnectionInfo().getIPCPort()), NetUtils.getSocketFactory());
}
return this.taskManagerPluginComponent;
}
/**
* Destroys and removes the RPC stub object for this instance's task manager plugin component.
*/
private void destroyTaskManagerPluginProxy() {
if (this.taskManagerPluginComponent != null) {
RPC.stopProxy(this.taskManagerPluginComponent);
this.taskManagerPluginComponent = null;
}
}
/**
* Returns the type of the instance.
*
......@@ -318,38 +280,6 @@ public abstract class AbstractInstance extends NetworkNode {
getTaskManagerProxy().killTaskManager();
}
/**
* Connects to the plugin component of this instance's task manager and sends data to the plugin with the given ID.
*
* @param pluginID
* the ID of the plugin to send data to
* @param data
* the data to send
* @throws IOException
* thrown if an error occurs while sending the data from the plugin
*/
public synchronized void sendData(final PluginID pluginID, final IOReadableWritable data) throws IOException {
getTaskManagerPluginProxy().sendData(pluginID, data);
}
/**
* Connects to the plugin component of this instance's task manager and requests data from the plugin with the given
* ID.
*
* @param pluginID
* the ID of the plugin to request data from
* @param data
* data to specify the request
* @return the requested data, possibly <code>null</code>
* @throws IOException
* thrown if an error occurs while requesting the data from the plugin
*/
public synchronized IOReadableWritable requestData(PluginID pluginID, IOReadableWritable data) throws IOException {
return getTaskManagerPluginProxy().requestData(pluginID, data);
}
/**
* Invalidates the entries identified by the given channel IDs from the remote task manager's receiver lookup cache.
*
......@@ -369,7 +299,6 @@ public abstract class AbstractInstance extends NetworkNode {
public synchronized void destroyProxies() {
destroyTaskManagerProxy();
destroyTaskManagerPluginProxy();
}
}
......@@ -102,9 +102,6 @@ import eu.stratosphere.nephele.jobmanager.splitassigner.InputSplitWrapper;
import eu.stratosphere.nephele.managementgraph.ManagementGraph;
import eu.stratosphere.nephele.managementgraph.ManagementVertexID;
import eu.stratosphere.nephele.multicast.MulticastManager;
import eu.stratosphere.nephele.plugins.JobManagerPlugin;
import eu.stratosphere.nephele.plugins.PluginID;
import eu.stratosphere.nephele.plugins.PluginManager;
import eu.stratosphere.nephele.profiling.JobManagerProfiler;
import eu.stratosphere.nephele.profiling.ProfilingListener;
import eu.stratosphere.nephele.profiling.ProfilingUtils;
......@@ -112,7 +109,6 @@ import eu.stratosphere.nephele.protocols.ChannelLookupProtocol;
import eu.stratosphere.nephele.protocols.ExtendedManagementProtocol;
import eu.stratosphere.nephele.protocols.InputSplitProviderProtocol;
import eu.stratosphere.nephele.protocols.JobManagerProtocol;
import eu.stratosphere.nephele.protocols.PluginCommunicationProtocol;
import eu.stratosphere.nephele.taskmanager.AbstractTaskResult;
import eu.stratosphere.nephele.taskmanager.TaskCancelResult;
import eu.stratosphere.nephele.taskmanager.TaskExecutionState;
......@@ -136,7 +132,7 @@ import eu.stratosphere.nephele.util.StringUtils;
* @author warneke
*/
public class JobManager implements DeploymentManager, ExtendedManagementProtocol, InputSplitProviderProtocol,
JobManagerProtocol, ChannelLookupProtocol, JobStatusListener, PluginCommunicationProtocol {
JobManagerProtocol, ChannelLookupProtocol, JobStatusListener {
public static enum ExecutionMode { LOCAL, CLUSTER }
......@@ -158,8 +154,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
private InstanceManager instanceManager;
private final Map<PluginID, JobManagerPlugin> jobManagerPlugins;
private final int recommendedClientPollingInterval;
private final ExecutorService executorService = Executors.newCachedThreadPool();
......@@ -171,13 +165,8 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);
private volatile boolean isShutDown = false;
public JobManager(ExecutionMode executionMode) {
this(executionMode, null);
}
public JobManager(ExecutionMode executionMode, final String pluginsDir) {
final String ipcAddressString = GlobalConfiguration
.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
......@@ -230,13 +219,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
LOG.info("Starting job manager in " + executionMode + " mode");
// Load the plugins
if (pluginsDir != null) {
this.jobManagerPlugins = PluginManager.getJobManagerPlugins(this, pluginsDir);
} else {
this.jobManagerPlugins = Collections.emptyMap();
}
// Try to load the instance manager for the given execution mode
// Try to load the scheduler for the given execution mode
if (executionMode == ExecutionMode.LOCAL) {
......@@ -347,12 +329,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
}
}
// Stop the plugins
final Iterator<JobManagerPlugin> it = this.jobManagerPlugins.values().iterator();
while (it.hasNext()) {
it.next().shutdown();
}
// Stop and clean up the job progress collector
if (this.eventCollector != null) {
this.eventCollector.shutdown();
......@@ -413,7 +389,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
GlobalConfiguration.loadConfiguration(configDir);
// Create a new job manager object
JobManager jobManager = new JobManager(executionMode, configDir);
JobManager jobManager = new JobManager(executionMode);
// Run the main task loop
jobManager.runTaskLoop();
......@@ -500,30 +476,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
jobRunsWithProfiling = true;
}
// Allow plugins to rewrite the job graph
Iterator<JobManagerPlugin> it = this.jobManagerPlugins.values().iterator();
while (it.hasNext()) {
final JobManagerPlugin plugin = it.next();
if (plugin.requiresProfiling() && !jobRunsWithProfiling) {
LOG.debug("Skipping job graph rewrite by plugin " + plugin + " because job " + job.getJobID()
+ " will not be executed with profiling");
continue;
}
final JobGraph inputJob = job;
job = plugin.rewriteJobGraph(inputJob);
if (job == null) {
if (LOG.isWarnEnabled()) {
LOG.warn("Plugin " + plugin + " set job graph to null, reverting changes...");
}
job = inputJob;
}
if (job != inputJob && LOG.isDebugEnabled()) {
LOG.debug("Plugin " + plugin + " rewrote job graph");
}
}
// Try to create initial execution graph from job graph
LOG.info("Creating initial execution graph from job graph " + job.getName());
ExecutionGraph eg = null;
......@@ -535,28 +487,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
return result;
}
// Allow plugins to rewrite the execution graph
it = this.jobManagerPlugins.values().iterator();
while (it.hasNext()) {
final JobManagerPlugin plugin = it.next();
if (plugin.requiresProfiling() && !jobRunsWithProfiling) {
LOG.debug("Skipping execution graph rewrite by plugin " + plugin + " because job " + job.getJobID()
+ " will not be executed with profiling");
continue;
}
final ExecutionGraph inputGraph = eg;
eg = plugin.rewriteExecutionGraph(inputGraph);
if (eg == null) {
LOG.warn("Plugin " + plugin + " set execution graph to null, reverting changes...");
eg = inputGraph;
}
if (eg != inputGraph) {
LOG.debug("Plugin " + plugin + " rewrote execution graph");
}
}
// Register job with the progress collector
if (this.eventCollector != null) {
this.eventCollector.registerJob(eg, jobRunsWithProfiling, System.currentTimeMillis());
......@@ -569,16 +499,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
if (this.eventCollector != null) {
this.profiler.registerForProfilingData(eg.getJobID(), this.eventCollector);
}
// Allow plugins to register their own profiling listeners for the job
it = this.jobManagerPlugins.values().iterator();
while (it.hasNext()) {
final ProfilingListener listener = it.next().getProfilingListener(eg.getJobID());
if (listener != null) {
this.profiler.registerForProfilingData(eg.getJobID(), listener);
}
}
}
// Register job with the dynamic input split assigner
......@@ -1249,34 +1170,4 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
return new InputSplitWrapper(jobID, this.inputSplitManager.getNextInputSplit(vertex, sequenceNumber.getValue()));
}
/**
* {@inheritDoc}
*/
@Override
public void sendData(final PluginID pluginID, final IOReadableWritable data) throws IOException {
final JobManagerPlugin jmp = this.jobManagerPlugins.get(pluginID);
if (jmp == null) {
LOG.error("Cannot find job manager plugin for plugin ID " + pluginID);
return;
}
jmp.sendData(data);
}
/**
* {@inheritDoc}
*/
@Override
public IOReadableWritable requestData(final PluginID pluginID, final IOReadableWritable data) throws IOException {
final JobManagerPlugin jmp = this.jobManagerPlugins.get(pluginID);
if (jmp == null) {
LOG.error("Cannot find job manager plugin for plugin ID " + pluginID);
return null;
}
return jmp.requestData(data);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.plugins;
import eu.stratosphere.nephele.configuration.Configuration;
/**
* This abstract class must be inherited by each plugin for Nephele. It specifies how to instantiate the individual
* plugin components and provides access to the plugin environment, for example the plugin configuration.
* <p>
* This class is thread-safe.
*
* @author warneke
*/
public abstract class AbstractPluginLoader {
/**
* The name of the configuration as specified in the plugin configuration file
*/
private final String pluginName;
/**
* The configuration for this plugin.
*/
private final Configuration pluginConfiguration;
/**
* A service to lookup the location of the plugin's remote components.
*/
private final PluginLookupService pluginLookupService;
/**
* Constructs the plugin loader.
*
* @param pluginName
* the name of the plugin as specified in the plugin configuration file
* @param pluginConfiguration
* the plugin configuration
* @param pluginLookupService
* a service to lookup the location of the plugin's remote components
*/
public AbstractPluginLoader(final String pluginName, final Configuration pluginConfiguration,
final PluginLookupService pluginLookupService) {
this.pluginName = pluginName;
this.pluginConfiguration = pluginConfiguration;
this.pluginLookupService = pluginLookupService;
}
/**
* Returns the {@link Configuration} for this plugin.
*
* @return the {@link Configuration} for this plugin
*/
protected final Configuration getPluginConfiguration() {
return this.pluginConfiguration;
}
/**
* Returns the name of the plugin as specified in the plugin configuration file.
*
* @return the anem of the plugin as specified in the plugin configuration file
*/
final String getPluginName() {
return this.pluginName;
}
/**
* Returns a service through which a plugin can determine the location of its remote components.
*
* @return a service through which a plugin can determine the location of its remote components
*/
protected final PluginLookupService getPluginLookupService() {
return this.pluginLookupService;
}
/**
* Returns an ID which uniquely specifies the plugin.
*
* @return an ID which uniquely specified the plugin
*/
public abstract PluginID getPluginID();
/**
* Loads and returns the plugin component which is supposed to run inside Nephele's {@link JobManager}.
*
* @return the {@link JobManager} plugin component or <code>null</code> if this plugin does not provide such a
* component.
*/
public abstract JobManagerPlugin getJobManagerPlugin();
/**
* Loads and returns the plugin component which is supposed to run inside Nephele's {@link TaskManager}.
*
* @return the {@link TaskManager} plugin component or <code>null</code> if this plugin does not provide such a
* component.
*/
public abstract TaskManagerPlugin getTaskManagerPlugin();
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.plugins;
import java.io.IOException;
import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.io.IOReadableWritable;
import eu.stratosphere.nephele.protocols.PluginCommunicationProtocol;
public final class JobManagerLookupService implements PluginLookupService {
private final PluginCommunicationProtocol jobManager;
JobManagerLookupService(final PluginCommunicationProtocol jobManager) {
this.jobManager = jobManager;
}
private static final class JobManagerStub implements PluginCommunication {
private final PluginCommunicationProtocol jobManager;
private final PluginID pluginID;
private JobManagerStub(final PluginCommunicationProtocol jobManager, final PluginID pluginID) {
this.jobManager = jobManager;
this.pluginID = pluginID;
}
/**
* {@inheritDoc}
*/
@Override
public void sendData(final IOReadableWritable data) throws IOException {
synchronized (this.jobManager) {
this.jobManager.sendData(this.pluginID, data);
}
}
/**
* {@inheritDoc}
*/
@Override
public IOReadableWritable requestData(final IOReadableWritable data) throws IOException {
synchronized (this.jobManager) {
return this.jobManager.requestData(this.pluginID, data);
}
}
}
private final static class TaskManagerStub implements PluginCommunication {
private final AbstractInstance instance;
private final PluginID pluginID;
private TaskManagerStub(final AbstractInstance instance, final PluginID pluginID) {
this.instance = instance;
this.pluginID = pluginID;
}
/**
* {@inheritDoc}
*/
@Override
public void sendData(final IOReadableWritable data) throws IOException {
this.instance.sendData(this.pluginID, data);
}
/**
* {@inheritDoc}
*/
@Override
public IOReadableWritable requestData(final IOReadableWritable data) throws IOException {
return this.instance.requestData(this.pluginID, data);
}
}
/**
* {@inheritDoc}
*/
@Override
public PluginCommunication getJobManagerComponent(final PluginID pluginID) {
return new JobManagerStub(this.jobManager, pluginID);
}
/**
* {@inheritDoc}
*/
@Override
public PluginCommunication getTaskManagerComponent(final PluginID pluginID, final AbstractInstance instance) {
return new TaskManagerStub(instance, pluginID);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.plugins;
import eu.stratosphere.nephele.executiongraph.ExecutionGraph;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.profiling.ProfilingListener;
/**
* This interface must be implemented by every plugin component which is supposed to run inside Nephele's job manager.
*
* @author warneke
*/
public interface JobManagerPlugin extends PluginCommunication {
/**
* Checks whether the plugin requires a job to be executed with profiling enabled in order to work properly.
*
* @return <code>true</code> if the job requires profiling to be enabled for a job, <code>false</code> otherwise
*/
boolean requiresProfiling();
/**
* This method is called upon the reception of a new job graph. It gives the plugin the possibility to to rewrite
* the job graph before it is processed further.
*
* @param jobGraph
* the original job graph
* @return the rewritten job graph
*/
JobGraph rewriteJobGraph(JobGraph jobGraph);
/**
* This method is called after the initial execution graph has been created from the user's job graph. It gives the
* plugin the possibility to rewrite the execution graph before it is processed further or to register to particular
* events.
*
* @param executionGraph
* the initial execution graph
* @return the rewritten execution graph
*/
ExecutionGraph rewriteExecutionGraph(ExecutionGraph executionGraph);
/**
* This method is called before the deployment of the execution graph. It provides the plugin with the possibility
* to return a custom {@link ProfilingListener} which is then registered with the profiling component. As a result,
* the plugin will receive profiling events during the job execution.
*
* @param jobID
* the ID of the job to return a profiling listener for
* @return the profiling listener for the job or <code>null</code> if the plugin does not want to receive profiling
* data for the job
*/
ProfilingListener getProfilingListener(JobID jobID);
/**
* Called by the job manager to indicate that Nephele is about to shut down.
*/
void shutdown();
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.plugins;
import java.io.IOException;
import eu.stratosphere.nephele.io.IOReadableWritable;
public interface PluginCommunication {
void sendData(IOReadableWritable data) throws IOException;
IOReadableWritable requestData(IOReadableWritable data) throws IOException;
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.plugins;
import eu.stratosphere.nephele.io.AbstractID;
/**
* This class provides IDs to uniquely identify Nephele plugins.
*
* @author warneke
*/
public final class PluginID extends AbstractID {
/**
* Constructs a new plugin ID from the given byte array.
*
* @param byteArray
* the byte array to construct the plugin ID from
*/
private PluginID(final byte[] byteArray) {
super(byteArray);
}
/**
* Default constructor required for the deserialization.
*/
public PluginID() {
super();
}
/**
* Constructs a new plugin ID from the given byte array.
*
* @param byteArray
* the byte array to construct the plugin ID from
*/
public static PluginID fromByteArray(final byte[] byteArray) {
if (byteArray == null) {
throw new IllegalArgumentException("Argument byteArray must not be null");
}
if (byteArray.length != SIZE) {
throw new IllegalArgumentException("Provided byte array must have a length of " + SIZE);
}
return new PluginID(byteArray);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.plugins;
import eu.stratosphere.nephele.instance.AbstractInstance;
public interface PluginLookupService {
PluginCommunication getJobManagerComponent(PluginID pluginID);
PluginCommunication getTaskManagerComponent(PluginID pluginID, AbstractInstance instance);
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.plugins;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
import org.xml.sax.SAXException;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.protocols.PluginCommunicationProtocol;
import eu.stratosphere.nephele.taskmanager.TaskManager;
import eu.stratosphere.nephele.util.StringUtils;
/**
* The plugin manager is responsible for loading and managing the individual plugins.
* <p>
* This class is thread-safe.
*
* @author warneke
*/
public final class PluginManager {
/**
* The log object used to report errors and information in general.
*/
private static final Log LOG = LogFactory.getLog(PluginManager.class);
/**
* The name of the file containing the plugin configuration.
*/
private static final String PLUGIN_CONFIG_FILE = "nephele-plugins.xml";
/**
* The singleton instance of this class.
*/
private static PluginManager INSTANCE = null;
private final Map<String, AbstractPluginLoader> plugins;
private PluginManager(final String configDir, final PluginLookupService pluginLookupService) {
// Check if the configuration file exists
final File configFile = new File(configDir + File.separator + PLUGIN_CONFIG_FILE);
if (configFile.exists()) {
this.plugins = loadPlugins(configFile, pluginLookupService);
} else {
this.plugins = Collections.emptyMap();
LOG.warn("Unable to load plugins: configuration file " + configFile.getAbsolutePath() + " not found");
}
}
private String getTextChild(final Node node) {
final NodeList nodeList = node.getChildNodes();
if (nodeList.getLength() != 1) {
return null;
}
final Node child = nodeList.item(0);
if (!(child instanceof Text)) {
return null;
}
final Text text = (Text) child;
return text.getNodeValue();
}
@SuppressWarnings("unchecked")
private Map<String, AbstractPluginLoader> loadPlugins(final File configFile,
final PluginLookupService pluginLookupService) {
final Map<String, AbstractPluginLoader> tmpPluginList = new LinkedHashMap<String, AbstractPluginLoader>();
final DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
// Ignore comments in the XML file
docBuilderFactory.setIgnoringComments(true);
docBuilderFactory.setNamespaceAware(true);
try {
final DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
final Document doc = builder.parse(configFile);
if (doc == null) {
LOG.error("Unable to load plugins: doc is null");
return Collections.emptyMap();
}
final Element root = doc.getDocumentElement();
if (root == null) {
LOG.error("Unable to load plugins: root is null");
return Collections.emptyMap();
}
if (!"plugins".equals(root.getNodeName())) {
LOG.error("Unable to load plugins: unknown element " + root.getNodeName());
return Collections.emptyMap();
}
final NodeList pluginNodes = root.getChildNodes();
int pluginCounter = 0;
for (int i = 0; i < pluginNodes.getLength(); ++i) {
final Node pluginNode = pluginNodes.item(i);
// Ignore text at this point
if (pluginNode instanceof Text) {
continue;
}
if (!"plugin".equals(pluginNode.getNodeName())) {
LOG.error("Unable to load plugins: unknown element " + pluginNode.getNodeName());
continue;
}
// Increase plugin counter
++pluginCounter;
final NodeList pluginChildren = pluginNode.getChildNodes();
String pluginName = null;
String pluginClass = null;
Configuration pluginConfiguration = null;
for (int j = 0; j < pluginChildren.getLength(); ++j) {
final Node pluginChild = pluginChildren.item(j);
// Ignore text at this point
if (pluginChild instanceof Text) {
continue;
}
if ("name".equals(pluginChild.getNodeName())) {
pluginName = getTextChild(pluginChild);
if (pluginName == null) {
LOG.error("Skipping plugin " + pluginCounter
+ " from configuration because it does not provide a proper name");
continue;
}
}
if ("class".equals(pluginChild.getNodeName())) {
pluginClass = getTextChild(pluginChild);
if (pluginClass == null) {
LOG.error("Skipping plugin " + pluginCounter
+ " from configuration because it does not provide a loader class");
continue;
}
}
if ("configuration".equals(pluginChild.getNodeName())) {
pluginConfiguration = new Configuration();
final NodeList configurationNodes = pluginChild.getChildNodes();
for (int k = 0; k < configurationNodes.getLength(); ++k) {
final Node configurationNode = configurationNodes.item(k);
// Ignore text at this point
if (configurationNode instanceof Text) {
continue;
}
if (!"property".equals(configurationNode.getNodeName())) {
LOG.error("Unexpected node " + configurationNode.getNodeName() + ", skipping...");
continue;
}
String key = null;
String value = null;
final NodeList properties = configurationNode.getChildNodes();
for (int l = 0; l < properties.getLength(); ++l) {
final Node property = properties.item(l);
// Ignore text at this point
if (configurationNode instanceof Text) {
continue;
}
if ("key".equals(property.getNodeName())) {
key = getTextChild(property);
if (key == null) {
LOG.warn("Skipping configuration entry for plugin " + pluginName
+ " because of invalid key");
continue;
}
}
if ("value".equals(property.getNodeName())) {
value = getTextChild(property);
if (value == null) {
LOG.warn("Skipping configuration entry for plugin " + pluginName
+ " because of invalid value");
continue;
}
}
}
if (key != null && value != null) {
pluginConfiguration.setString(key, value);
}
}
}
}
if (pluginName == null) {
LOG.error("Plugin " + pluginCounter + " does not provide a name, skipping...");
continue;
}
if (pluginClass == null) {
LOG.error("Plugin " + pluginCounter + " does not provide a loader class, skipping...");
continue;
}
if (pluginConfiguration == null) {
LOG.warn("Plugin " + pluginCounter
+ " does not provide a configuration, using default configuration");
pluginConfiguration = new Configuration();
}
Class<? extends AbstractPluginLoader> loaderClass;
try {
loaderClass = (Class<? extends AbstractPluginLoader>) Class.forName(pluginClass);
} catch (ClassNotFoundException e) {
LOG.error("Unable to load plugin " + pluginName + ": " + StringUtils.stringifyException(e));
continue;
}
if (loaderClass == null) {
LOG.error("Unable to load plugin " + pluginName + ": loaderClass is null");
continue;
}
Constructor<? extends AbstractPluginLoader> constructor;
try {
constructor = (Constructor<? extends AbstractPluginLoader>) loaderClass
.getConstructor(String.class, Configuration.class, PluginLookupService.class);
} catch (SecurityException e) {
LOG.error("Unable to load plugin " + pluginName + ": " + StringUtils.stringifyException(e));
continue;
} catch (NoSuchMethodException e) {
LOG.error("Unable to load plugin " + pluginName + ": " + StringUtils.stringifyException(e));
continue;
}
if (constructor == null) {
LOG.error("Unable to load plugin " + pluginName + ": constructor is null");
continue;
}
AbstractPluginLoader pluginLoader = null;
try {
pluginLoader = constructor.newInstance(pluginName, pluginConfiguration, pluginLookupService);
} catch (IllegalArgumentException e) {
LOG.error("Unable to load plugin " + pluginName + ": " + StringUtils.stringifyException(e));
continue;
} catch (InstantiationException e) {
LOG.error("Unable to load plugin " + pluginName + ": " + StringUtils.stringifyException(e));
continue;
} catch (IllegalAccessException e) {
LOG.error("Unable to load plugin " + pluginName + ": " + StringUtils.stringifyException(e));
continue;
} catch (InvocationTargetException e) {
LOG.error("Unable to load plugin " + pluginName + ": " + StringUtils.stringifyException(e));
continue;
}
if (pluginLoader == null) {
LOG.error("Unable to load plugin " + pluginName + ": pluginLoader is null");
continue;
}
LOG.info("Successfully loaded plugin " + pluginName);
tmpPluginList.put(pluginName, pluginLoader);
}
} catch (IOException e) {
LOG.error("Error while loading plugins: " + StringUtils.stringifyException(e));
} catch (SAXException e) {
LOG.error("Error while loading plugins: " + StringUtils.stringifyException(e));
} catch (ParserConfigurationException e) {
LOG.error("Error while loading plugins: " + StringUtils.stringifyException(e));
}
return Collections.unmodifiableMap(tmpPluginList);
}
private static synchronized PluginManager getInstance(final String configDir,
final PluginLookupService pluginLookupService) {
if (INSTANCE == null) {
INSTANCE = new PluginManager(configDir, pluginLookupService);
}
return INSTANCE;
}
private Map<PluginID, JobManagerPlugin> getJobManagerPluginsInternal() {
final Map<PluginID, JobManagerPlugin> jobManagerPluginMap = new HashMap<PluginID, JobManagerPlugin>();
final Iterator<AbstractPluginLoader> it = this.plugins.values().iterator();
while (it.hasNext()) {
final AbstractPluginLoader apl = it.next();
final PluginID pluginID = apl.getPluginID();
final JobManagerPlugin jmp = apl.getJobManagerPlugin();
if (jmp != null) {
if (!jobManagerPluginMap.containsKey(pluginID)) {
jobManagerPluginMap.put(pluginID, jmp);
} else {
LOG.error("Detected ID collision for plugin " + apl.getPluginName() + ", skipping it...");
}
}
}
return Collections.unmodifiableMap(jobManagerPluginMap);
}
private Map<PluginID, TaskManagerPlugin> getTaskManagerPluginsInternal() {
final Map<PluginID, TaskManagerPlugin> taskManagerPluginMap = new HashMap<PluginID, TaskManagerPlugin>();
final Iterator<AbstractPluginLoader> it = this.plugins.values().iterator();
while (it.hasNext()) {
final AbstractPluginLoader apl = it.next();
final PluginID pluginID = apl.getPluginID();
final TaskManagerPlugin tmp = apl.getTaskManagerPlugin();
if (tmp != null) {
if (!taskManagerPluginMap.containsKey(pluginID)) {
taskManagerPluginMap.put(pluginID, tmp);
} else {
LOG.error("Detected ID collision for plugin " + apl.getPluginName() + ", skipping it...");
}
}
}
return Collections.unmodifiableMap(taskManagerPluginMap);
}
public static Map<PluginID, JobManagerPlugin> getJobManagerPlugins(final PluginCommunicationProtocol jobManager,
final String configDir) {
final JobManagerLookupService lookupService = new JobManagerLookupService(jobManager);
return getInstance(configDir, lookupService).getJobManagerPluginsInternal();
}
public static Map<PluginID, TaskManagerPlugin> getTaskManagerPlugins(final TaskManager taskManager,
final String configDir) {
final TaskManagerLookupService lookupService = new TaskManagerLookupService(taskManager);
return getInstance(configDir, lookupService).getTaskManagerPluginsInternal();
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.plugins;
import java.io.IOException;
import eu.stratosphere.nephele.instance.AbstractInstance;
import eu.stratosphere.nephele.io.IOReadableWritable;
import eu.stratosphere.nephele.taskmanager.TaskManager;
public final class TaskManagerLookupService implements PluginLookupService {
private final TaskManager taskManager;
public TaskManagerLookupService(final TaskManager taskManager) {
this.taskManager = taskManager;
}
private static final class JobManagerStub implements PluginCommunication {
private final TaskManager taskManager;
private final PluginID pluginID;
private JobManagerStub(final TaskManager taskManager, final PluginID pluginID) {
this.taskManager = taskManager;
this.pluginID = pluginID;
}
/**
* {@inheritDoc}
*/
@Override
public void sendData(final IOReadableWritable data) throws IOException {
this.taskManager.sendDataToJobManager(this.pluginID, data);
}
/**
* {@inheritDoc}
*/
@Override
public IOReadableWritable requestData(final IOReadableWritable data) throws IOException {
return this.taskManager.requestDataFromJobManager(this.pluginID, data);
}
}
/**
* {@inheritDoc}
*/
@Override
public PluginCommunication getJobManagerComponent(final PluginID pluginID) {
return new JobManagerStub(this.taskManager, pluginID);
}
/**
* {@inheritDoc}
*/
@Override
public PluginCommunication getTaskManagerComponent(final PluginID pluginID, final AbstractInstance instance) {
throw new UnsupportedOperationException("getTaskManagerComponenet must not be called on this lookup service");
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.plugins;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
public interface TaskManagerPlugin extends PluginCommunication {
/**
* Registers a new incoming task with this task manager plugin.
*
* @param id
* the ID of the vertex representing the task
* @param jobConfiguration
* the job configuration
* @param environment
* the environment of the task
*/
void registerTask(ExecutionVertexID id, Configuration jobConfiguration, Environment environment);
/**
* Unregisters a finished, canceled, or failed task from this task manager plugin.
*
* @param id
* the ID of the vertex representing the task
* @param environment
* the environment of the task
*/
void unregisterTask(ExecutionVertexID id, Environment environment);
/**
* Called by the task manager to indicate that Nephele is about to shut down.
*/
void shutdown();
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.plugins.wrapper;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.execution.Environment;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.GateID;
import eu.stratosphere.nephele.io.InputGate;
import eu.stratosphere.nephele.io.OutputGate;
import eu.stratosphere.nephele.io.RecordDeserializerFactory;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.template.InputSplitProvider;
import eu.stratosphere.nephele.types.Record;
/**
* This class provides an abstract base class for an environment wrapper. An environment wrapper can be used by a plugin
* to wrap a task's environment and intercept particular method calls. The default implementation of this abstract base
* class simply forwards every method call to the encapsulated environment.
* <p>
* This class is thread-safe.
*
* @author warneke
*/
public abstract class AbstractEnvironmentWrapper implements Environment {
private final Environment wrappedEnvironment;
/**
* Constructs a new abstract environment wrapper.
*
* @param wrappedEnvironment
* the environment to be wrapped
*/
public AbstractEnvironmentWrapper(final Environment wrappedEnvironment) {
if (wrappedEnvironment == null) {
throw new IllegalArgumentException("Argument wrappedEnvironment must not be null");
}
this.wrappedEnvironment = wrappedEnvironment;
}
/**
* Returns the wrapped environment.
*
* @return the wrapped environment
*/
protected Environment getWrappedEnvironment() {
return this.wrappedEnvironment;
}
/**
* {@inheritDoc}
*/
@Override
public JobID getJobID() {
return this.wrappedEnvironment.getJobID();
}
/**
* {@inheritDoc}
*/
@Override
public Configuration getTaskConfiguration() {
return this.wrappedEnvironment.getTaskConfiguration();
}
/**
* {@inheritDoc}
*/
@Override
public Configuration getJobConfiguration() {
return this.wrappedEnvironment.getJobConfiguration();
}
/**
* {@inheritDoc}
*/
@Override
public int getCurrentNumberOfSubtasks() {
return this.wrappedEnvironment.getCurrentNumberOfSubtasks();
}
/**
* {@inheritDoc}
*/
@Override
public int getIndexInSubtaskGroup() {
return this.wrappedEnvironment.getIndexInSubtaskGroup();
}
/**
* {@inheritDoc}
*/
@Override
public void userThreadStarted(final Thread userThread) {
this.wrappedEnvironment.userThreadStarted(userThread);
}
/**
* {@inheritDoc}
*/
@Override
public void userThreadFinished(final Thread userThread) {
this.wrappedEnvironment.userThreadStarted(userThread);
}
/**
* {@inheritDoc}
*/
@Override
public InputSplitProvider getInputSplitProvider() {
return this.wrappedEnvironment.getInputSplitProvider();
}
/**
* {@inheritDoc}
*/
@Override
public IOManager getIOManager() {
return this.wrappedEnvironment.getIOManager();
}
/**
* {@inheritDoc}
*/
@Override
public MemoryManager getMemoryManager() {
return this.wrappedEnvironment.getMemoryManager();
}
/**
* {@inheritDoc}
*/
@Override
public String getTaskName() {
return this.wrappedEnvironment.getTaskName();
}
/**
* {@inheritDoc}
*/
@Override
public GateID getNextUnboundInputGateID() {
return this.wrappedEnvironment.getNextUnboundInputGateID();
}
/**
* {@inheritDoc}
*/
@Override
public GateID getNextUnboundOutputGateID() {
return this.wrappedEnvironment.getNextUnboundOutputGateID();
}
/**
* {@inheritDoc}
*/
@Override
public int getNumberOfOutputGates() {
return this.wrappedEnvironment.getNumberOfOutputGates();
}
/**
* {@inheritDoc}
*/
@Override
public int getNumberOfInputGates() {
return this.wrappedEnvironment.getNumberOfInputGates();
}
/**
* {@inheritDoc}
*/
@Override
public <T extends Record> OutputGate<T> createOutputGate(final GateID gateID,
final Class<T> outputClass, final ChannelSelector<T> selector, final boolean isBroadcast)
{
return this.wrappedEnvironment.createOutputGate(gateID, outputClass, selector, isBroadcast);
}
/**
* {@inheritDoc}
*/
@Override
public <T extends Record> InputGate<T> createInputGate(final GateID gateID,
final RecordDeserializerFactory<T> deserializer)
{
return this.wrappedEnvironment.createInputGate(gateID, deserializer);
}
/**
* {@inheritDoc}
*/
@Override
public void registerOutputGate(final OutputGate<? extends Record> outputGate) {
this.wrappedEnvironment.registerOutputGate(outputGate);
}
/**
* {@inheritDoc}
*/
@Override
public void registerInputGate(final InputGate<? extends Record> inputGate) {
this.wrappedEnvironment.registerInputGate(inputGate);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.plugins.wrapper;
import java.io.IOException;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.GateID;
import eu.stratosphere.nephele.io.InputGate;
import eu.stratosphere.nephele.io.channels.AbstractInputChannel;
import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.io.channels.bytebuffered.InMemoryInputChannel;
import eu.stratosphere.nephele.io.channels.bytebuffered.NetworkInputChannel;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.types.Record;
/**
* This class provides an abstract base class for an input gate wrapper. An input gate wrapper can be used by a plugin
* to wrap an input gate and intercept particular method calls. The default implementation of this abstract base class
* simply forwards every method call to the encapsulated input gate.
* <p>
* This class is thread-safe.
*
* @author warneke
* @param <T>
* the type of record transported through this input gate
*/
public abstract class AbstractInputGateWrapper<T extends Record> implements InputGate<T> {
/**
* The wrapped input gate.
*/
private final InputGate<T> wrappedInputGate;
/**
* Constructs a new abstract input gate wrapper.
*
* @param wrappedInputGate
* the input gate to be wrapped
*/
public AbstractInputGateWrapper(final InputGate<T> wrappedInputGate) {
if (wrappedInputGate == null) {
throw new IllegalArgumentException("Argument wrappedInputGate must not be null");
}
this.wrappedInputGate = wrappedInputGate;
}
/**
* Returns the wrapped input gate.
*
* @return the wrapped input gate
*/
protected InputGate<T> getWrappedInputGate() {
return this.wrappedInputGate;
}
/**
* {@inheritDoc}
*/
@Override
public int getIndex() {
return this.wrappedInputGate.getIndex();
}
/**
* {@inheritDoc}
*/
@Override
public void subscribeToEvent(final EventListener eventListener, final Class<? extends AbstractTaskEvent> eventType) {
this.wrappedInputGate.subscribeToEvent(eventListener, eventType);
}
/**
* {@inheritDoc}
*/
@Override
public void unsubscribeFromEvent(final EventListener eventListener,
final Class<? extends AbstractTaskEvent> eventType) {
this.wrappedInputGate.unsubscribeFromEvent(eventListener, eventType);
}
/**
* {@inheritDoc}
*/
@Override
public void publishEvent(final AbstractTaskEvent event) throws IOException, InterruptedException {
this.wrappedInputGate.publishEvent(event);
}
/**
* {@inheritDoc}
*/
@Override
public void deliverEvent(AbstractTaskEvent event) {
this.wrappedInputGate.deliverEvent(event);
}
/**
* {@inheritDoc}
*/
@Override
public JobID getJobID() {
return this.wrappedInputGate.getJobID();
}
/**
* {@inheritDoc}
*/
@Override
public ChannelType getChannelType() {
return this.wrappedInputGate.getChannelType();
}
/**
* {@inheritDoc}
*/
@Override
public GateID getGateID() {
return this.wrappedInputGate.getGateID();
}
/**
* {@inheritDoc}
*/
@Override
public void releaseAllChannelResources() {
this.wrappedInputGate.releaseAllChannelResources();
}
/**
* {@inheritDoc}
*/
@Override
public boolean isClosed() throws IOException, InterruptedException {
return this.wrappedInputGate.isClosed();
}
/**
* {@inheritDoc}
*/
@Override
public boolean isInputGate() {
return this.wrappedInputGate.isInputGate();
}
/**
* {@inheritDoc}
*/
@Override
public void setChannelType(final ChannelType channelType) {
this.wrappedInputGate.setChannelType(channelType);
}
/**
* {@inheritDoc}
*/
@Override
public T readRecord(final T target) throws IOException, InterruptedException {
return this.wrappedInputGate.readRecord(target);
}
/**
* {@inheritDoc}
*/
@Override
public int getNumberOfInputChannels() {
return this.wrappedInputGate.getNumberOfInputChannels();
}
/**
* {@inheritDoc}
*/
@Override
public AbstractInputChannel<T> getInputChannel(final int pos) {
return this.wrappedInputGate.getInputChannel(pos);
}
/**
* {@inheritDoc}
*/
@Override
public void notifyRecordIsAvailable(final int channelIndex) {
this.wrappedInputGate.notifyRecordIsAvailable(channelIndex);
}
/**
* {@inheritDoc}
*/
@Override
public void activateInputChannels() throws IOException, InterruptedException {
this.wrappedInputGate.activateInputChannels();
}
/**
* {@inheritDoc}
*/
@Override
public void close() throws IOException, InterruptedException {
this.wrappedInputGate.close();
}
/**
* {@inheritDoc}
*/
@Override
public NetworkInputChannel<T> createNetworkInputChannel(final InputGate<T> inputGate, final ChannelID channelID,
final ChannelID connectedChannelID) {
return this.wrappedInputGate.createNetworkInputChannel(inputGate, channelID, connectedChannelID);
}
/**
* {@inheritDoc}
*/
@Override
public InMemoryInputChannel<T> createInMemoryInputChannel(final InputGate<T> inputGate, final ChannelID channelID,
final ChannelID connectedChannelID) {
return this.wrappedInputGate.createInMemoryInputChannel(inputGate, channelID, connectedChannelID);
}
/**
* {@inheritDoc}
*/
@Override
public void removeAllInputChannels() {
this.wrappedInputGate.removeAllInputChannels();
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.plugins.wrapper;
import java.io.IOException;
import java.util.List;
import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
import eu.stratosphere.nephele.event.task.EventListener;
import eu.stratosphere.nephele.io.ChannelSelector;
import eu.stratosphere.nephele.io.GateID;
import eu.stratosphere.nephele.io.OutputGate;
import eu.stratosphere.nephele.io.channels.AbstractOutputChannel;
import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.io.channels.bytebuffered.InMemoryOutputChannel;
import eu.stratosphere.nephele.io.channels.bytebuffered.NetworkOutputChannel;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.types.Record;
/**
* This class provides an abstract base class for an output gate wrapper. An output gate wrapper can be used by a plugin
* to wrap an output gate and intercept particular method calls. The default implementation of this abstract base class
* simply forwards every method call to the encapsulated output gate.
* <p>
* This class is thread-safe.
*
* @author warneke
* @param <T>
* the type of record transported through this output gate
*/
public abstract class AbstractOutputGateWrapper<T extends Record> implements OutputGate<T> {
/**
* The wrapped output gate.
*/
private final OutputGate<T> wrappedOutputGate;
/**
* Constructs a new abstract output gate wrapper.
*
* @param wrappedOutputGate
* the output gate to be wrapped
*/
public AbstractOutputGateWrapper(final OutputGate<T> wrappedOutputGate) {
if (wrappedOutputGate == null) {
throw new IllegalArgumentException("Argument wrappedOutputGate must not be null");
}
this.wrappedOutputGate = wrappedOutputGate;
}
/**
* Returns the wrapped output gate.
*
* @return the wrapped output gate
*/
protected OutputGate<T> getWrappedOutputGate() {
return this.wrappedOutputGate;
}
/**
* {@inheritDoc}
*/
@Override
public int getIndex() {
return this.wrappedOutputGate.getIndex();
}
/**
* {@inheritDoc}
*/
@Override
public void subscribeToEvent(final EventListener eventListener, final Class<? extends AbstractTaskEvent> eventType) {
this.wrappedOutputGate.subscribeToEvent(eventListener, eventType);
}
/**
* {@inheritDoc}
*/
@Override
public void unsubscribeFromEvent(final EventListener eventListener,
final Class<? extends AbstractTaskEvent> eventType) {
this.wrappedOutputGate.unsubscribeFromEvent(eventListener, eventType);
}
/**
* {@inheritDoc}
*/
@Override
public void publishEvent(final AbstractTaskEvent event) throws IOException, InterruptedException {
this.wrappedOutputGate.publishEvent(event);
}
/**
* {@inheritDoc}
*/
@Override
public void deliverEvent(final AbstractTaskEvent event) {
this.wrappedOutputGate.deliverEvent(event);
}
/**
* {@inheritDoc}
*/
@Override
public JobID getJobID() {
return this.wrappedOutputGate.getJobID();
}
/**
* {@inheritDoc}
*/
@Override
public ChannelType getChannelType() {
return this.wrappedOutputGate.getChannelType();
}
/**
* {@inheritDoc}
*/
@Override
public GateID getGateID() {
return this.wrappedOutputGate.getGateID();
}
/**
* {@inheritDoc}
*/
@Override
public void releaseAllChannelResources() {
this.wrappedOutputGate.releaseAllChannelResources();
}
/**
* {@inheritDoc}
*/
@Override
public boolean isClosed() throws IOException, InterruptedException {
return this.wrappedOutputGate.isClosed();
}
/**
* {@inheritDoc}
*/
@Override
public boolean isInputGate() {
return this.wrappedOutputGate.isInputGate();
}
/**
* {@inheritDoc}
*/
@Override
public void setChannelType(final ChannelType channelType) {
this.wrappedOutputGate.setChannelType(channelType);
}
/**
* {@inheritDoc}
*/
@Override
public Class<T> getType() {
return this.wrappedOutputGate.getType();
}
/**
* {@inheritDoc}
*/
@Override
public void writeRecord(final T record) throws IOException, InterruptedException {
this.wrappedOutputGate.writeRecord(record);
}
/**
* {@inheritDoc}
*/
@Override
public List<AbstractOutputChannel<T>> getOutputChannels() {
return this.wrappedOutputGate.getOutputChannels();
}
/**
* {@inheritDoc}
*/
@Override
public void flush() throws IOException, InterruptedException {
this.wrappedOutputGate.flush();
}
/**
* {@inheritDoc}
*/
@Override
public boolean isBroadcast() {
return this.wrappedOutputGate.isBroadcast();
}
/**
* {@inheritDoc}
*/
@Override
public int getNumberOfOutputChannels() {
return this.wrappedOutputGate.getNumberOfOutputChannels();
}
/**
* {@inheritDoc}
*/
@Override
public AbstractOutputChannel<T> getOutputChannel(final int pos) {
return this.wrappedOutputGate.getOutputChannel(pos);
}
/**
* {@inheritDoc}
*/
@Override
public ChannelSelector<T> getChannelSelector() {
return this.wrappedOutputGate.getChannelSelector();
}
/**
* {@inheritDoc}
*/
@Override
public void requestClose() throws IOException, InterruptedException {
this.wrappedOutputGate.requestClose();
}
/**
* {@inheritDoc}
*/
@Override
public void removeAllOutputChannels() {
this.wrappedOutputGate.removeAllOutputChannels();
}
/**
* {@inheritDoc}
*/
@Override
public NetworkOutputChannel<T> createNetworkOutputChannel(final OutputGate<T> outputGate,
final ChannelID channelID, final ChannelID connectedChannelID) {
return this.wrappedOutputGate.createNetworkOutputChannel(outputGate, channelID, connectedChannelID);
}
/**
* {@inheritDoc}
*/
@Override
public InMemoryOutputChannel<T> createInMemoryOutputChannel(final OutputGate<T> outputGate,
final ChannelID channelID, final ChannelID connectedChannelID) {
return this.wrappedOutputGate.createInMemoryOutputChannel(outputGate, channelID, connectedChannelID);
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package eu.stratosphere.nephele.protocols;
import java.io.IOException;
import eu.stratosphere.nephele.io.IOReadableWritable;
import eu.stratosphere.nephele.plugins.PluginID;
public interface PluginCommunicationProtocol extends VersionedProtocol {
void sendData(PluginID pluginID, IOReadableWritable data) throws IOException;
IOReadableWritable requestData(PluginID pluginID, IOReadableWritable data) throws IOException;
}
......@@ -21,7 +21,6 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
......@@ -60,21 +59,16 @@ import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
import eu.stratosphere.nephele.instance.HardwareDescription;
import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
import eu.stratosphere.nephele.instance.InstanceConnectionInfo;
import eu.stratosphere.nephele.io.IOReadableWritable;
import eu.stratosphere.nephele.io.channels.ChannelID;
import eu.stratosphere.nephele.ipc.RPC;
import eu.stratosphere.nephele.ipc.Server;
import eu.stratosphere.nephele.jobgraph.JobID;
import eu.stratosphere.nephele.net.NetUtils;
import eu.stratosphere.nephele.plugins.PluginID;
import eu.stratosphere.nephele.plugins.PluginManager;
import eu.stratosphere.nephele.plugins.TaskManagerPlugin;
import eu.stratosphere.nephele.profiling.ProfilingUtils;
import eu.stratosphere.nephele.profiling.TaskManagerProfiler;
import eu.stratosphere.nephele.protocols.ChannelLookupProtocol;
import eu.stratosphere.nephele.protocols.InputSplitProviderProtocol;
import eu.stratosphere.nephele.protocols.JobManagerProtocol;
import eu.stratosphere.nephele.protocols.PluginCommunicationProtocol;
import eu.stratosphere.nephele.protocols.TaskOperationProtocol;
import eu.stratosphere.nephele.services.iomanager.IOManager;
import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
......@@ -93,7 +87,7 @@ import eu.stratosphere.nephele.util.StringUtils;
*
* @author warneke
*/
public class TaskManager implements TaskOperationProtocol, PluginCommunicationProtocol {
public class TaskManager implements TaskOperationProtocol {
private static final Log LOG = LogFactory.getLog(TaskManager.class);
......@@ -103,8 +97,6 @@ public class TaskManager implements TaskOperationProtocol, PluginCommunicationPr
private final ChannelLookupProtocol lookupService;
private final PluginCommunicationProtocol pluginCommunicationService;
private final ExecutorService executorService = Executors.newCachedThreadPool();
private static final int handlerCount = 1;
......@@ -141,30 +133,17 @@ public class TaskManager implements TaskOperationProtocol, PluginCommunicationPr
private final HardwareDescription hardwareDescription;
private final Map<PluginID, TaskManagerPlugin> taskManagerPlugins;
/**
* Stores whether the task manager has already been shut down.
*/
private boolean isShutDown = false;
/**
* Constructs a new task manager, starts its IPC service and attempts to discover the job manager to
* receive an initial configuration. All parameters are obtained from the
* {@link GlobalConfiguration}, which must be loaded prior to instantiating the task manager.
*/
public TaskManager() throws Exception {
this(null);
}
/**
* Constructs a new task manager, starts its IPC service and attempts to discover the job manager to
* receive an initial configuration. All parameters are obtained from the
* {@link GlobalConfiguration}, which must be loaded prior to instantiating the task manager.
*
* @param pluginDir The directory to load plug-ins from.
*/
public TaskManager(String pluginDir) throws Exception {
public TaskManager() throws Exception {
// IMPORTANT! At this point, the GlobalConfiguration must have been read!
......@@ -245,17 +224,6 @@ public class TaskManager implements TaskOperationProtocol, PluginCommunicationPr
}
this.lookupService = lookupService;
// Try to create local stub for the plugin communication service
PluginCommunicationProtocol pluginCommunicationService = null;
try {
pluginCommunicationService = RPC.getProxy(PluginCommunicationProtocol.class, jobManagerAddress,
NetUtils.getSocketFactory());
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
throw new Exception("Failed to initialize plugin communication protocol. " + e.getMessage(), e);
}
this.pluginCommunicationService = pluginCommunicationService;
// Start local RPC server
Server taskManagerServer = null;
try {
......@@ -328,13 +296,6 @@ public class TaskManager implements TaskOperationProtocol, PluginCommunicationPr
this.ioManager = new IOManager(tmpDirPaths);
// Load the plugins
if (pluginDir != null) {
this.taskManagerPlugins = PluginManager.getTaskManagerPlugins(this, pluginDir);
} else {
this.taskManagerPlugins = Collections.emptyMap();
}
// Add shutdown hook for clean up tasks
Runtime.getRuntime().addShutdownHook(new TaskManagerCleanUp(this));
}
......@@ -371,7 +332,7 @@ public class TaskManager implements TaskOperationProtocol, PluginCommunicationPr
// Create a new task manager object
TaskManager taskManager = null;
try {
taskManager = new TaskManager(configDir);
taskManager = new TaskManager();
} catch (Exception e) {
LOG.fatal("Taskmanager startup failed:" + StringUtils.stringifyException(e));
System.exit(FAILURERETURNCODE);
......@@ -611,14 +572,6 @@ public class TaskManager implements TaskOperationProtocol, PluginCommunicationPr
task.registerProfiler(this.profiler, jobConfiguration);
}
// Allow plugins to register their listeners for this task
if (!this.taskManagerPlugins.isEmpty()) {
final Iterator<TaskManagerPlugin> it = this.taskManagerPlugins.values().iterator();
while (it.hasNext()) {
it.next().registerTask(id, jobConfiguration, ee);
}
}
this.runningTasks.put(id, task);
}
}
......@@ -652,14 +605,6 @@ public class TaskManager implements TaskOperationProtocol, PluginCommunicationPr
// Unregister task from memory manager
task.unregisterMemoryManager(this.memoryManager);
// Allow plugins to unregister their listeners for this task
if (!this.taskManagerPlugins.isEmpty()) {
final Iterator<TaskManagerPlugin> it = this.taskManagerPlugins.values().iterator();
while (it.hasNext()) {
it.next().unregisterTask(id, task.getEnvironment());
}
}
// Unregister task from library cache manager
try {
LibraryCacheManager.unregister(task.getJobID());
......@@ -744,9 +689,6 @@ public class TaskManager implements TaskOperationProtocol, PluginCommunicationPr
// Stop RPC proxy for the lookup service
RPC.stopProxy(this.lookupService);
// Stop RPC proxy for the plugin communication service
RPC.stopProxy(this.pluginCommunicationService);
// Shut down the own RPC server
this.taskManagerServer.stop();
......@@ -779,12 +721,6 @@ public class TaskManager implements TaskOperationProtocol, PluginCommunicationPr
}
}
// Shut down the plugins
final Iterator<TaskManagerPlugin> it = this.taskManagerPlugins.values().iterator();
while (it.hasNext()) {
it.next().shutdown();
}
this.isShutDown = true;
}
......@@ -856,74 +792,6 @@ public class TaskManager implements TaskOperationProtocol, PluginCommunicationPr
this.byteBufferedChannelManager.invalidateLookupCacheEntries(channelIDs);
}
/**
* Sends data from the plugin with the given ID to the respective component of the plugin running at the job
* manager.
*
* @param pluginID
* the ID of plugin
* @param data
* the data to be sent
* @throws IOException
* thrown if an I/O error occurs during the RPC call
*/
public void sendDataToJobManager(final PluginID pluginID, final IOReadableWritable data) throws IOException {
synchronized (this.pluginCommunicationService) {
this.pluginCommunicationService.sendData(pluginID, data);
}
}
/**
* Requests data for the plugin with the given ID from the respective plugin component running at the job manager.
*
* @param pluginID
* the ID of the plugin
* @param data
* the data to specify the request
* @return the requested data
* @throws IOException
* thrown if an I/O error occurs during the RPC call
*/
public IOReadableWritable requestDataFromJobManager(final PluginID pluginID, final IOReadableWritable data)
throws IOException {
synchronized (this.pluginCommunicationService) {
return this.pluginCommunicationService.requestData(pluginID, data);
}
}
/**
* {@inheritDoc}
*/
@Override
public void sendData(final PluginID pluginID, final IOReadableWritable data) throws IOException {
final TaskManagerPlugin tmp = this.taskManagerPlugins.get(pluginID);
if (tmp == null) {
LOG.error("Cannot find task manager plugin for plugin ID " + pluginID);
return;
}
tmp.sendData(data);
}
/**
* {@inheritDoc}
*/
@Override
public IOReadableWritable requestData(final PluginID pluginID, final IOReadableWritable data) throws IOException {
final TaskManagerPlugin tmp = this.taskManagerPlugins.get(pluginID);
if (tmp == null) {
LOG.error("Cannot find task manager plugin for plugin ID " + pluginID);
return null;
}
return tmp.requestData(data);
}
/**
* Checks, whether the given strings describe existing directories that are writable. If that is not
* the case, an exception is raised.
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
#######################################################################################################################
##
## Copyright (C) 2010 by the Stratosphere project (http://stratosphere.eu)
##
## Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
## an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
## specific language governing permissions and limitations under the License.
##
#######################################################################################################################
-->
<plugins>
</plugins>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册