提交 1aab122b 编写于 作者: S StephanEwen

Cleanup and simplification in the taskmanager and the local instances and mini cluster.

上级 b0fbce71
......@@ -30,6 +30,7 @@ import eu.stratosphere.configuration.ConfigConstants;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.client.JobClient;
import eu.stratosphere.nephele.instance.HardwareDescriptionFactory;
import eu.stratosphere.nephele.instance.local.LocalTaskManagerThread;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobmanager.JobManager;
......@@ -46,7 +47,7 @@ public class LocalDistributedExecutor extends PlanExecutor {
private static final int JOB_MANAGER_RPC_PORT = 6498;
private static final int SLEEP_TIME = 100;
private static final int SLEEP_TIME = 50;
private static final int START_STOP_TIMEOUT = 2000;
......@@ -54,53 +55,30 @@ public class LocalDistributedExecutor extends PlanExecutor {
private boolean running = false;
private JobManagerThread jobManagerThread;
private JobManager jobManager;
private List<LocalTaskManagerThread> taskManagerThreads = new ArrayList<LocalTaskManagerThread>();
public static class JobManagerThread extends Thread {
JobManager jm;
public JobManagerThread(JobManager jm) {
this.jm = jm;
public synchronized void start(int numTaskMgr) throws Exception {
if (this.running) {
return;
}
@Override
public void run() {
this.jm.runTaskLoop();
}
public void shutDown() {
this.jm.shutdown();
}
// we need to down size the memory. determine the memory and divide it by the number of task managers
long javaMem = HardwareDescriptionFactory.extractFromSystem().getSizeOfFreeMemory();
javaMem /= numTaskMgr;
public boolean isShutDown() {
return this.jm.isShutDown();
}
}
public synchronized void start(int numTaskMgr) throws InterruptedException {
if (this.running) {
return;
}
// convert memory from bytes to megabytes
javaMem >>>= 20;
Configuration conf = NepheleMiniCluster.getMiniclusterDefaultConfig(
JOB_MANAGER_RPC_PORT, 6500, 7501, null, true, true, false);
JOB_MANAGER_RPC_PORT, 6500, 7501, javaMem, null, false, true, false);
GlobalConfiguration.includeConfiguration(conf);
// start job manager
JobManager jobManager;
try {
jobManager = new JobManager(ExecutionMode.CLUSTER);
}
catch (Exception e) {
e.printStackTrace();
return;
}
this.jobManagerThread = new JobManagerThread(jobManager);
this.jobManagerThread.setDaemon(true);
this.jobManagerThread.start();
this.jobManager = new JobManager(ExecutionMode.CLUSTER);
// start the task managers
for (int tm = 0; tm < numTaskMgr; tm++) {
......@@ -115,8 +93,7 @@ public class LocalDistributedExecutor extends PlanExecutor {
GlobalConfiguration.includeConfiguration(tmConf);
LocalTaskManagerThread t = new LocalTaskManagerThread(
"LocalDistributedExecutor: LocalTaskManagerThread-#" + tm, numTaskMgr);
LocalTaskManagerThread t = new LocalTaskManagerThread("LocalDistributedExecutor: LocalTaskManagerThread-#" + tm);
t.start();
taskManagerThreads.add(t);
......@@ -178,12 +155,10 @@ public class LocalDistributedExecutor extends PlanExecutor {
}
// 2. shut down job manager
this.jobManagerThread.shutDown();
this.jobManagerThread.interrupt();
this.jobManagerThread.join(START_STOP_TIMEOUT);
this.jobManager.shutdown();
for (int sleep = 0; sleep < START_STOP_TIMEOUT; sleep += SLEEP_TIME) {
if (this.jobManagerThread.isShutDown()) {
if (this.jobManager.isShutDown()) {
break;
}
......@@ -191,12 +166,12 @@ public class LocalDistributedExecutor extends PlanExecutor {
}
try {
if (!this.jobManagerThread.isShutDown()) {
if (!this.jobManager.isShutDown()) {
throw new RuntimeException(String.format("Job manager shut down timed out (%d ms).", START_STOP_TIMEOUT));
}
} finally {
this.taskManagerThreads.clear();
this.jobManagerThread = null;
this.jobManager = null;
this.running = false;
}
}
......
......@@ -42,7 +42,9 @@ public class NepheleMiniCluster {
private static final int DEFAULT_TM_DATA_PORT = 7501;
private static final boolean DEFAULT_VISUALIZER_ENABLED = true;
private static final long DEFAULT_MEMORY_SIZE = -1;
private static final boolean DEFAULT_LAZY_MEMORY_ALLOCATION = true;
// --------------------------------------------------------------------------------------------
......@@ -54,19 +56,19 @@ public class NepheleMiniCluster {
private int taskManagerDataPort = DEFAULT_TM_DATA_PORT;
private long memorySize = DEFAULT_MEMORY_SIZE;
private String configDir;
private String hdfsConfigFile;
private boolean visualizerEnabled = DEFAULT_VISUALIZER_ENABLED;
private boolean lazyMemoryAllocation = DEFAULT_LAZY_MEMORY_ALLOCATION;
private boolean defaultOverwriteFiles = false;
private boolean defaultAlwaysCreateDirectory = false;
private Thread runner;
private JobManager jobManager;
// ------------------------------------------------------------------------
......@@ -97,6 +99,14 @@ public class NepheleMiniCluster {
this.taskManagerDataPort = taskManagerDataPort;
}
public long getMemorySize() {
return memorySize;
}
public void setMemorySize(long memorySize) {
this.memorySize = memorySize;
}
public String getConfigDir() {
return configDir;
}
......@@ -113,12 +123,12 @@ public class NepheleMiniCluster {
this.hdfsConfigFile = hdfsConfigFile;
}
public boolean isVisualizerEnabled() {
return visualizerEnabled;
public boolean isLazyMemoryAllocation() {
return lazyMemoryAllocation;
}
public void setVisualizerEnabled(boolean visualizerEnabled) {
this.visualizerEnabled = visualizerEnabled;
public void setLazyMemoryAllocation(boolean lazyMemoryAllocation) {
this.lazyMemoryAllocation = lazyMemoryAllocation;
}
public boolean isDefaultOverwriteFiles() {
......@@ -156,7 +166,7 @@ public class NepheleMiniCluster {
GlobalConfiguration.loadConfiguration(configDir);
} else {
Configuration conf = getMiniclusterDefaultConfig(jobManagerRpcPort, taskManagerRpcPort,
taskManagerDataPort, hdfsConfigFile, visualizerEnabled, defaultOverwriteFiles, defaultAlwaysCreateDirectory);
taskManagerDataPort, memorySize, hdfsConfigFile, lazyMemoryAllocation, defaultOverwriteFiles, defaultAlwaysCreateDirectory);
GlobalConfiguration.includeConfiguration(conf);
}
......@@ -164,7 +174,7 @@ public class NepheleMiniCluster {
// we need to do this here, because the format classes may have been initialized before the mini cluster was started
initializeIOFormatClasses();
// before we start the jobmanager, we need to make sure that there are no lingering IPC threads from before
// before we start the JobManager, we need to make sure that there are no lingering IPC threads from before
// check that all threads are done before we return
Thread[] allThreads = new Thread[Thread.activeCount()];
int numThreads = Thread.enumerate(allThreads);
......@@ -172,23 +182,13 @@ public class NepheleMiniCluster {
for (int i = 0; i < numThreads; i++) {
Thread t = allThreads[i];
String name = t.getName();
if (name.equals("Local Taskmanager IO Loop") || name.startsWith("IPC")) {
if (name.startsWith("IPC")) {
t.join();
}
}
// start the job manager
jobManager = new JobManager(ExecutionMode.LOCAL);
runner = new Thread("JobManager Task Loop") {
@Override
public void run() {
// run the main task loop
jobManager.runTaskLoop();
}
};
runner.setDaemon(true);
runner.start();
waitForJobManagerToBecomeReady();
}
}
......@@ -199,12 +199,6 @@ public class NepheleMiniCluster {
jobManager.shutdown();
jobManager = null;
}
if (runner != null) {
runner.interrupt();
runner.join();
runner = null;
}
}
}
......@@ -215,7 +209,7 @@ public class NepheleMiniCluster {
private void waitForJobManagerToBecomeReady() throws InterruptedException {
Map<InstanceType, InstanceTypeDescription> instanceMap;
while ((instanceMap = jobManager.getMapOfAvailableInstanceTypes()) == null || instanceMap.isEmpty()) {
Thread.sleep(100);
Thread.sleep(50);
}
}
......@@ -235,7 +229,7 @@ public class NepheleMiniCluster {
}
public static Configuration getMiniclusterDefaultConfig(int jobManagerRpcPort, int taskManagerRpcPort,
int taskManagerDataPort, String hdfsConfigFile, boolean visualization,
int taskManagerDataPort, long memorySize, String hdfsConfigFile, boolean lazyMemory,
boolean defaultOverwriteFiles, boolean defaultAlwaysCreateDirectory)
{
final Configuration config = new Configuration();
......@@ -249,12 +243,11 @@ public class NepheleMiniCluster {
// with the low dop, we can use few RPC handlers
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_HANDLERS_KEY, 2);
config.setBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, lazyMemory);
// polling interval
config.setInteger(ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, 2);
// enable / disable features
config.setBoolean("jobmanager.visualization.enable", visualization);
// hdfs
if (hdfsConfigFile != null) {
config.setString(ConfigConstants.HDFS_DEFAULT_CONFIG, hdfsConfigFile);
......@@ -264,6 +257,10 @@ public class NepheleMiniCluster {
config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, defaultOverwriteFiles);
config.setBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY, defaultAlwaysCreateDirectory);
if (memorySize > 0) {
config.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, memorySize);
}
return config;
}
}
\ No newline at end of file
......@@ -82,6 +82,12 @@ public final class ConfigConstants {
*/
public static final String TASK_MANAGER_MEMORY_FRACTION_KEY = "taskmanager.memory.fraction";
/**
* The key for the config parameter defining whether the memory manager allocates memory lazy.
*/
public static final String TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY = "taskmanager.memory.lazyalloc";
/**
* The config parameter defining the number of buffers used in the network stack. This defines the
* number of possible tasks and shuffles.
......@@ -93,6 +99,12 @@ public final class ConfigConstants {
*/
public static final String TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY = "taskmanager.network.bufferSizeInBytes";
/**
* Parameter for the interval in which the RaskManager sends the periodic heart beat messages
* to the JobManager (in msecs).
*/
public static final String TASK_MANAGER_HEARTBEAT_INTERVAL_KEY = "taskmanager.heartbeat-interval";
/**
* Parameter for the maximum fan for out-of-core algorithms.
* Corresponds to the maximum fan-in for merge-sorts and the maximum fan-out
......@@ -124,10 +136,8 @@ public final class ConfigConstants {
public static final String JOBCLIENT_POLLING_INTERVAL_KEY = "jobclient.polling.interval";
// ------------------------ Hadoop Configuration ------------------------
/**
* Path to hdfs-defaul.xml file
*/
......@@ -292,7 +302,12 @@ public final class ConfigConstants {
/**
* The default fraction of the free memory allocated by the task manager's memory manager.
*/
public static final float DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION = 0.7f;
public static final float DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION = 0.8f;
/**
* The default setting for the memory manager lazy allocation feature.
*/
public static final boolean DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION = false;
/**
* Default number of buffers used in the network stack.
......@@ -304,6 +319,11 @@ public final class ConfigConstants {
*/
public static final int DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE = 32768;
/**
* The default interval for TaskManager heart beats (2000 msecs).
*/
public static final int DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL = 2000;
/**
* The default value for the JobClient's polling interval. 2 Seconds.
*/
......
......@@ -50,15 +50,7 @@ public class HardwareDescriptionFactory {
* The regular expression used to extract the size of the physical memory
* under Linux.
*/
private static final Pattern LINUX_MEMORY_REGEX = Pattern
.compile("^MemTotal:\\s*(\\d+)\\s+kB$");
/**
* The fraction of free memory that goes into the memory manager by default.
*/
private static float RUNTIME_MEMORY_THRESHOLD = GlobalConfiguration.getFloat(
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
private static final Pattern LINUX_MEMORY_REGEX = Pattern.compile("^MemTotal:\\s*(\\d+)\\s+kB$");
/**
* Private constructor, so class cannot be instantiated.
......@@ -73,25 +65,16 @@ public class HardwareDescriptionFactory {
* one value for the hardware description cannot be determined
*/
public static HardwareDescription extractFromSystem() {
return extractFromSystem(1);
}
int numberOfCPUCores = Runtime.getRuntime().availableProcessors();
public static HardwareDescription extractFromSystem(final int taskManagersPerJVM) {
final int numberOfCPUCores = Runtime.getRuntime().availableProcessors();
final long sizeOfPhysicalMemory = getSizeOfPhysicalMemory();
long sizeOfPhysicalMemory = getSizeOfPhysicalMemory();
if (sizeOfPhysicalMemory < 0) {
return null;
sizeOfPhysicalMemory = 1;
}
final long sizeOfFreeMemory = getSizeOfFreeMemory() / taskManagersPerJVM;
if (sizeOfFreeMemory < 0) {
return null;
}
long sizeOfFreeMemory = getSizeOfFreeMemory();
return new HardwareDescription(numberOfCPUCores, sizeOfPhysicalMemory,
sizeOfFreeMemory);
return new HardwareDescription(numberOfCPUCores, sizeOfPhysicalMemory, sizeOfFreeMemory);
}
/**
......@@ -119,8 +102,11 @@ public class HardwareDescriptionFactory {
* determined
*/
private static long getSizeOfFreeMemory() {
float fractionToUse = GlobalConfiguration.getFloat(
ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
Runtime r = Runtime.getRuntime();
return (long) (RUNTIME_MEMORY_THRESHOLD * (r.maxMemory() - r.totalMemory() + r.freeMemory()));
return (long) (fractionToUse * (r.maxMemory() - r.totalMemory() + r.freeMemory()));
}
/**
......
......@@ -32,27 +32,27 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
/**
* The network address the instance's task manager binds its sockets to.
*/
private InetAddress inetAddress = null;
private InetAddress inetAddress;
/**
* The port the instance's task manager runs its IPC service on.
*/
private int ipcPort = 0;
private int ipcPort;
/**
* The port the instance's task manager expects to receive transfer envelopes on.
*/
private int dataPort = 0;
private int dataPort;
/**
* The host name of the instance.
*/
private String hostName = null;
private String hostName;
/**
* The domain name of the instance.
*/
private String domainName = null;
private String domainName;
/**
* Constructs a new instance connection info object. The constructor will attempt to retrieve the instance's
......@@ -65,7 +65,7 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
* @param dataPort
* the port instance's task manager expects to receive transfer envelopes on
*/
public InstanceConnectionInfo(final InetAddress inetAddress, final int ipcPort, final int dataPort) {
public InstanceConnectionInfo(InetAddress inetAddress, int ipcPort, int dataPort) {
if (inetAddress == null) {
throw new IllegalArgumentException("Argument inetAddress must not be null");
......@@ -147,8 +147,7 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
/**
* Constructs an empty {@link InstanceConnectionInfo} object.
*/
public InstanceConnectionInfo() {
}
public InstanceConnectionInfo() {}
/**
* Returns the port instance's task manager runs its IPC service on.
......@@ -156,7 +155,6 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
* @return the port instance's task manager runs its IPC service on
*/
public int getIPCPort() {
return this.ipcPort;
}
......@@ -166,7 +164,6 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
* @return the port instance's task manager expects to receive transfer envelopes on
*/
public int getDataPort() {
return this.dataPort;
}
......@@ -176,7 +173,6 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
* @return the network address the instance's task manager binds its sockets to
*/
public InetAddress getAddress() {
return this.inetAddress;
}
......@@ -187,7 +183,6 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
* @return the host name of the instance
*/
public String getHostName() {
return this.hostName;
}
......@@ -197,7 +192,6 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
* @return the domain name of the instance or <code>null</code> if the domain name could not be determined
*/
public String getDomainName() {
return this.domainName;
}
......@@ -278,16 +272,13 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
@Override
public int hashCode() {
return this.inetAddress.hashCode();
}
@Override
public int compareTo(final InstanceConnectionInfo o) {
return this.getAddress().getHostName()
.compareTo(((InstanceConnectionInfo) o).getAddress().getHostName());
return this.getAddress().getHostName().compareTo(((InstanceConnectionInfo) o).getAddress().getHostName());
}
}
......@@ -13,12 +13,9 @@
package eu.stratosphere.nephele.instance.cluster;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
......@@ -27,8 +24,6 @@ import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
......@@ -124,11 +119,6 @@ public class ClusterManager implements InstanceManager {
*/
private static final String CLEANUP_INTERVAL_KEY = "instancemanager.cluster.cleanupinterval";
/**
* Regular expression to extract the IP and the instance type of a cluster instance from the slave file.
*/
private static final Pattern IP_TO_INSTANCE_TYPE_PATTERN = Pattern.compile("^(\\S+)\\s*(\\S*)\\s*$");
// ------------------------------------------------------------------------
// Fields
// ------------------------------------------------------------------------
......@@ -311,9 +301,6 @@ public class ClusterManager implements InstanceManager {
// load the network topology from the slave file
this.networkTopology = loadNetworkTopology();
// load IP to instance type mapping from slave file
loadIPToInstanceTypeMapping();
// look every BASEINTERVAL milliseconds for crashed hosts
final boolean runTimerAsDaemon = true;
new Timer(runTimerAsDaemon).schedule(cleanupStaleMachines, 1000, 1000);
......@@ -322,67 +309,6 @@ public class ClusterManager implements InstanceManager {
updateInstaceTypeDescriptionMap();
}
/**
* Reads the IP to instance type mapping from the slave file.
*/
private void loadIPToInstanceTypeMapping() {
final String configDir = GlobalConfiguration.getString(CONFIG_DIR_KEY, null);
if (configDir == null) {
LOG.error("Cannot find configuration directory to read IP to instance type mapping");
return;
}
final File slaveFile = new File(configDir + File.separator + SLAVE_FILE_NAME);
if (!slaveFile.exists()) {
LOG.error("Cannot access slave file to read IP to instance type mapping");
return;
}
try {
final BufferedReader input = new BufferedReader(new FileReader(slaveFile));
String line = null;
while ((line = input.readLine()) != null) {
final Matcher m = IP_TO_INSTANCE_TYPE_PATTERN.matcher(line);
if (!m.matches()) {
LOG.error("Entry does not match format: " + line);
continue;
}
InetAddress address = null;
String host = m.group(1);
try {
final int pos = host.lastIndexOf('/');
if (pos != -1) {
host = host.substring(pos + 1);
}
address = InetAddress.getByName(host);
} catch (UnknownHostException e) {
LOG.error("Cannot resolve " + host + " to a hostname/IP address", e);
continue;
}
InstanceType instanceType = null;
String instanceTypeName = m.group(2);
if (instanceTypeName != null && instanceTypeName.length() > 0) {
instanceType = getInstanceTypeByName(instanceTypeName);
if (instanceType != null) {
this.ipToInstanceTypeMapping.put(address, instanceType);
}
}
}
input.close();
} catch (IOException e) {
LOG.error("Cannot load IP to instance type mapping from file " + e);
}
}
/**
* Sorts the list of available instance types by the number of CPU cores in a descending order.
*/
......@@ -414,7 +340,7 @@ public class ClusterManager implements InstanceManager {
// Check if slave file exists
final String configDir = GlobalConfiguration.getString(CONFIG_DIR_KEY, null);
if (configDir == null) {
LOG.error("Cannot find configuration directory to load network topology, using flat topology instead");
LOG.info("Cannot find configuration directory to load network topology, using flat topology instead");
return NetworkTopology.createEmptyTopology();
}
......
......@@ -48,9 +48,6 @@ import eu.stratosphere.nephele.util.SerializableHashMap;
* task manager which is executed within the same process as the job manager. Moreover, it determines the hardware
* characteristics of the machine it runs on and generates a default instance type with the identifier "default". If
* desired this default instance type can also be overwritten.
* <p>
* This class is thread-safe.
*
*/
public class LocalInstanceManager implements InstanceManager {
......@@ -137,21 +134,19 @@ public class LocalInstanceManager implements InstanceManager {
this.instanceTypeDescriptionMap = new SerializableHashMap<InstanceType, InstanceTypeDescription>();
this.localTaskManagerThread = new LocalTaskManagerThread("Local Taskmanager IO Loop",1);
this.localTaskManagerThread = new LocalTaskManagerThread("Local Taskmanager Heartbeat Loop");
this.localTaskManagerThread.start();
}
@Override
public InstanceType getDefaultInstanceType() {
return this.defaultInstanceType;
}
@Override
public InstanceType getInstanceTypeByName(final String instanceTypeName) {
if (this.defaultInstanceType.getIdentifier().equals(instanceTypeName)) {
return this.defaultInstanceType;
}
......@@ -229,21 +224,13 @@ public class LocalInstanceManager implements InstanceManager {
@Override
public void shutdown() {
// Stop the internal instance of the task manager
if (this.localTaskManagerThread != null) {
while (!this.localTaskManagerThread.isShutDown()) {
try {
// Interrupt the thread running the task manager
this.localTaskManagerThread.interrupt();
Thread.sleep(100);
} catch (InterruptedException e) {
break;
}
this.localTaskManagerThread.shutDown();
}
// Clear the instance type description list
if (this.instanceTypeDescriptionMap != null) {
this.instanceTypeDescriptionMap.clear();
}
......@@ -259,14 +246,12 @@ public class LocalInstanceManager implements InstanceManager {
@Override
public NetworkTopology getNetworkTopology(final JobID jobID) {
return this.networkTopology;
}
@Override
public void setInstanceListener(final InstanceListener instanceListener) {
this.instanceListener = instanceListener;
}
......@@ -300,7 +285,6 @@ public class LocalInstanceManager implements InstanceManager {
@Override
public Map<InstanceType, InstanceTypeDescription> getMapOfAvailableInstanceTypes() {
return this.instanceTypeDescriptionMap;
}
......@@ -342,17 +326,12 @@ public class LocalInstanceManager implements InstanceManager {
} else {
throw new InstanceException("No instance of type " + entry.getKey() + " available");
}
}
}
}
@Override
public AbstractInstance getInstanceByName(final String name) {
if (name == null) {
throw new IllegalArgumentException("Argument name must not be null");
}
......@@ -371,7 +350,6 @@ public class LocalInstanceManager implements InstanceManager {
@Override
public void cancelPendingRequests(final JobID jobID) {
// The local instance manager does not support pending requests, so nothing to do here
}
......
......@@ -15,11 +15,8 @@ package eu.stratosphere.nephele.instance.local;
import eu.stratosphere.nephele.taskmanager.TaskManager;
import java.io.IOException;
/**
* This class represents the thread which runs the task manager when Nephele is executed in local mode.
*
*/
public class LocalTaskManagerThread extends Thread {
......@@ -31,11 +28,11 @@ public class LocalTaskManagerThread extends Thread {
/**
* Constructs a new thread to run the task manager in Nephele's local mode.
*/
public LocalTaskManagerThread(final String name, final int taskManagersPerJVM) {
public LocalTaskManagerThread(String name) {
super(name);
TaskManager tmpTaskManager = null;
try {
tmpTaskManager = new TaskManager(taskManagersPerJVM);
tmpTaskManager = new TaskManager();
} catch (Exception e) {
throw new RuntimeException(e);
}
......@@ -44,12 +41,12 @@ public class LocalTaskManagerThread extends Thread {
@Override
public void run() {
this.taskManager.runIOLoop();
this.taskManager.runHeartbeatLoop();
// Wait until the task manager is shut down
while (!this.taskManager.isShutDown()) {
try {
Thread.sleep(100);
Thread.sleep(20);
} catch (InterruptedException e) {
break;
}
......
......@@ -11,24 +11,6 @@
* specific language governing permissions and limitations under the License.
**********************************************************************************************************************/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package eu.stratosphere.nephele.jobmanager;
import java.io.File;
......@@ -140,15 +122,15 @@ import eu.stratosphere.util.StringUtils;
*
*/
public class JobManager implements DeploymentManager, ExtendedManagementProtocol, InputSplitProviderProtocol,
JobManagerProtocol, ChannelLookupProtocol, JobStatusListener, AccumulatorProtocol {
JobManagerProtocol, ChannelLookupProtocol, JobStatusListener, AccumulatorProtocol
{
public static enum ExecutionMode { LOCAL, CLUSTER }
// --------------------------------------------------------------------------------------------
private static final Log LOG = LogFactory.getLog(JobManager.class);
private Server jobManagerServer = null;
private final Server jobManagerServer;
private final JobManagerProfiler profiler;
......@@ -170,36 +152,31 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
private final ExecutorService executorService = Executors.newCachedThreadPool(ExecutorThreadFactory.INSTANCE);
private final static int SLEEPINTERVAL = 1000;
private final static int FAILURERETURNCODE = 1;
private final static int FAILURE_RETURN_CODE = 1;
private final AtomicBoolean isShutdownInProgress = new AtomicBoolean(false);
private volatile boolean isShutDown = false;
private volatile boolean isShutDown;
private WebInfoServer server;
public JobManager(ExecutionMode executionMode) {
final String ipcAddressString = GlobalConfiguration
.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
public JobManager(ExecutionMode executionMode) throws Exception {
final String ipcAddressString = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
InetAddress ipcAddress = null;
if (ipcAddressString != null) {
try {
ipcAddress = InetAddress.getByName(ipcAddressString);
} catch (UnknownHostException e) {
LOG.error("Cannot convert " + ipcAddressString + " to an IP address: "
+ StringUtils.stringifyException(e));
System.exit(FAILURERETURNCODE);
throw new Exception("Cannot convert " + ipcAddressString + " to an IP address: " + e.getMessage(), e);
}
}
final int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
// Read the suggested client polling interval
this.recommendedClientPollingInterval = GlobalConfiguration.getInteger(
ConfigConstants.JOBCLIENT_POLLING_INTERVAL_KEY, ConfigConstants.DEFAULT_JOBCLIENT_POLLING_INTERVAL);
......@@ -210,12 +187,13 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
// Register simple job archive
int archived_items = GlobalConfiguration.getInteger(
ConfigConstants.JOB_MANAGER_WEB_ARCHIVE_COUNT, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_ARCHIVE_COUNT);
if(archived_items > 0) {
if (archived_items > 0) {
this.archive = new MemoryArchivist(archived_items);
this.eventCollector.registerArchivist(archive);
}
else
else {
this.archive = null;
}
// Create the accumulator manager, with same archiving limit as web
// interface. We need to store the accumulators for at least one job.
......@@ -233,12 +211,10 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
try {
final int handlerCount = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_HANDLERS_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_IPC_HANDLERS);
this.jobManagerServer = RPC.getServer(this, rpcServerAddress.getHostName(), rpcServerAddress.getPort(),
handlerCount);
this.jobManagerServer = RPC.getServer(this, rpcServerAddress.getHostName(), rpcServerAddress.getPort(), handlerCount);
this.jobManagerServer.start();
} catch (IOException ioe) {
LOG.error("Cannot start RPC server: " + StringUtils.stringifyException(ioe));
System.exit(FAILURERETURNCODE);
} catch (IOException e) {
throw new Exception("Cannot start RPC server: " + e.getMessage(), e);
}
LOG.info("Starting job manager in " + executionMode + " mode");
......@@ -248,17 +224,15 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
if (executionMode == ExecutionMode.LOCAL) {
try {
this.instanceManager = new LocalInstanceManager();
} catch (RuntimeException rte) {
LOG.fatal("Cannot instantiate local instance manager: " + StringUtils.stringifyException(rte));
System.exit(FAILURERETURNCODE);
} catch (Throwable t) {
throw new Exception("Cannot instantiate local instance manager: " + t.getMessage(), t);
}
} else {
final String instanceManagerClassName = JobManagerUtils.getInstanceManagerClassName(executionMode);
LOG.info("Trying to load " + instanceManagerClassName + " as instance manager");
this.instanceManager = JobManagerUtils.loadInstanceManager(instanceManagerClassName);
if (this.instanceManager == null) {
LOG.error("Unable to load instance manager " + instanceManagerClassName);
System.exit(FAILURERETURNCODE);
throw new Exception("Unable to load instance manager " + instanceManagerClassName);
}
}
......@@ -269,8 +243,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
// Try to get the instance manager class name
this.scheduler = JobManagerUtils.loadScheduler(schedulerClassName, this, this.instanceManager);
if (this.scheduler == null) {
LOG.error("Unable to load scheduler " + schedulerClassName);
System.exit(FAILURERETURNCODE);
throw new Exception("Unable to load scheduler " + schedulerClassName);
}
// Create multicastManager
......@@ -282,8 +255,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
"eu.stratosphere.nephele.profiling.impl.JobManagerProfilerImpl");
this.profiler = ProfilingUtils.loadJobManagerProfiler(profilerClassName, ipcAddress);
if (this.profiler == null) {
LOG.error("Cannot load profiler");
System.exit(FAILURERETURNCODE);
throw new Exception("Cannot load profiler");
}
} else {
this.profiler = null;
......@@ -295,24 +267,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
}
/**
* This is the main
*/
public void runTaskLoop() {
while (!Thread.interrupted()) {
// Sleep
try {
Thread.sleep(SLEEPINTERVAL);
} catch (InterruptedException e) {
break;
}
// Do nothing here
}
}
public void shutdown() {
if (!this.isShutdownInProgress.compareAndSet(false, true)) {
......@@ -340,9 +294,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
try {
this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug(StringUtils.stringifyException(e));
}
LOG.debug(e);
}
}
......@@ -391,8 +343,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
* arguments from the command line
*/
public static void main(final String[] args) {
public static void main(String[] args) {
// determine if a valid log4j config exists and initialize a default logger if not
if (System.getProperty("log4j.configuration") == null) {
Logger root = Logger.getRootLogger();
......@@ -403,19 +354,26 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
root.setLevel(Level.INFO);
}
JobManager jobManager = initialize(args);
JobManager jobManager;
try {
jobManager = initialize(args);
// Start info server for jobmanager
jobManager.startInfoServer();
}
catch (Exception e) {
LOG.fatal(e.getMessage(), e);
System.exit(FAILURE_RETURN_CODE);
}
// Run the main task loop
jobManager.runTaskLoop();
// Clean up task are triggered through a shutdown hook
// Clean up is triggered through a shutdown hook
// freeze this thread to keep the JVM alive (the job manager threads are daemon threads)
try {
new Object().wait();
} catch (InterruptedException e) {}
}
@SuppressWarnings("static-access")
public static JobManager initialize(final String[] args) {
public static JobManager initialize(String[] args) throws Exception {
// output the version and revision information to the log
logVersionInformation();
......@@ -435,7 +393,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
line = parser.parse(options, args);
} catch (ParseException e) {
LOG.error("CLI Parsing failed. Reason: " + e.getMessage());
System.exit(FAILURERETURNCODE);
System.exit(FAILURE_RETURN_CODE);
}
final String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
......@@ -448,7 +406,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
executionMode = ExecutionMode.CLUSTER;
} else {
System.err.println("Unrecognized execution mode: " + executionModeName);
System.exit(FAILURERETURNCODE);
System.exit(FAILURE_RETURN_CODE);
}
// First, try to load global configuration
......
......@@ -15,7 +15,6 @@ package eu.stratosphere.nephele.jobmanager;
/**
* This class takes care of cleaning up when the job manager is closed.
*
*/
public class JobManagerCleanUp extends Thread {
......@@ -37,9 +36,7 @@ public class JobManagerCleanUp extends Thread {
@Override
public void run() {
// Shut down the job manager properly
this.jobManager.shutdown();
}
}
......@@ -32,24 +32,8 @@ import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
import eu.stratosphere.nephele.template.AbstractInvokable;
/**
* Default MemoryManager implementation giving hard memory guarantees. The implementation has the following properties:
* <ul>
* <li>arbitrary segment sizes (smaller than 2GB)</li>
* <li>{@code allocate()} and {@code release()} calls in arbitrary order are supported</li>
* <li>allocation data is stored in a dedicated structure</li>
* <li>first-fit selection strategy</li>
* <li>automatic re-integration of released segments</li>
* </ul>
* This implementation uses internal byte arrays to allocate the required memory and allows allocation sizes greater
* than 2GB. Due to the fact that the length of a single java byte array is bounded by {@link #java.lang.Integer.MAX_VALUE} (2GB),
* the manager works 2 dimensional byte array (i.e. with memory chunks). Please be aware that in order to keep the array
* access methods in the {@link DefaultMemorySegment} fast and simple, the actual allocated memory segments must not
* exceed 2GB and must be contained in a single memory chunk.
*
*/
public class DefaultMemoryManager implements MemoryManager
{
public class DefaultMemoryManager implements MemoryManager {
/**
* The default memory page size. Currently set to 32 KiBytes.
*/
......@@ -102,8 +86,7 @@ public class DefaultMemoryManager implements MemoryManager
* @param memorySize The total size of the memory to be managed by this memory manager.
* @param pageSize The size of the pages handed out by the memory manager.
*/
public DefaultMemoryManager(long memorySize, int pageSize)
{
public DefaultMemoryManager(long memorySize, int pageSize) {
// sanity checks
if (memorySize <= 0) {
throw new IllegalArgumentException("Size of total memory must be positive.");
......@@ -143,8 +126,7 @@ public class DefaultMemoryManager implements MemoryManager
@Override
public void shutdown()
{
public void shutdown() {
// -------------------- BEGIN CRITICAL SECTION -------------------
synchronized (this.lock)
{
......@@ -168,8 +150,7 @@ public class DefaultMemoryManager implements MemoryManager
}
public boolean verifyEmpty()
{
public boolean verifyEmpty() {
synchronized (this.lock) {
return this.freeSegments.size() == this.totalNumPages;
}
......@@ -179,9 +160,6 @@ public class DefaultMemoryManager implements MemoryManager
// MemoryManager interface implementation
// ------------------------------------------------------------------------
/* (non-Javadoc)
* @see eu.stratosphere.nephele.services.memorymanager.MemoryManager#allocatePages(eu.stratosphere.nephele.template.AbstractInvokable, int)
*/
@Override
public List<MemorySegment> allocatePages(AbstractInvokable owner, int numPages) throws MemoryAllocationException {
final ArrayList<MemorySegment> segs = new ArrayList<MemorySegment>(numPages);
......@@ -189,9 +167,6 @@ public class DefaultMemoryManager implements MemoryManager
return segs;
}
/* (non-Javadoc)
* @see eu.stratosphere.nephele.services.memorymanager.MemoryManager#allocatePages(eu.stratosphere.nephele.template.AbstractInvokable, java.util.List, int)
*/
@Override
public void allocatePages(AbstractInvokable owner, List<MemorySegment> target, int numPages)
throws MemoryAllocationException
......@@ -238,8 +213,7 @@ public class DefaultMemoryManager implements MemoryManager
@Override
public void release(MemorySegment segment)
{
public void release(MemorySegment segment) {
// check if segment is null or has already been freed
if (segment == null || segment.isFreed() || !(segment instanceof DefaultMemorySegment)) {
return;
......@@ -343,8 +317,7 @@ public class DefaultMemoryManager implements MemoryManager
@Override
public void releaseAll(AbstractInvokable owner)
{
public void releaseAll(AbstractInvokable owner) {
// -------------------- BEGIN CRITICAL SECTION -------------------
synchronized (this.lock)
{
......@@ -379,13 +352,11 @@ public class DefaultMemoryManager implements MemoryManager
return this.pageSize;
}
@Override
public int computeNumberOfPages(long numBytes) {
return getNumPages(numBytes);
}
@Override
public long roundDownToPageSizeMultiple(long numBytes) {
return numBytes & this.roundingMask;
......@@ -393,8 +364,7 @@ public class DefaultMemoryManager implements MemoryManager
// ------------------------------------------------------------------------
private final int getNumPages(long numBytes)
{
private final int getNumPages(long numBytes) {
if (numBytes < 0)
throw new IllegalArgumentException("The number of bytes to allocate must not be negative.");
......
......@@ -14,9 +14,7 @@
package eu.stratosphere.nephele.taskmanager;
/**
* This is an auxiliary thread to facilitate the shutdown of the
* task manager through a shutdown hook.
*
* This is an auxiliary thread to facilitate the shutdown of the task manager through a shutdown hook.
*/
public class TaskManagerCleanUp extends Thread {
......@@ -35,12 +33,9 @@ public class TaskManagerCleanUp extends Thread {
this.taskManager = taskManager;
}
@Override
public void run() {
// Call shutdown method for the task manager
this.taskManager.shutdown();
}
}
......@@ -38,6 +38,7 @@ public class LocalInstanceManagerTest {
@Test
public void testInstanceTypeFromConfiguration() {
try {
final String configDir = ServerTestUtils.getConfigDir();
if (configDir == null) {
fail("Cannot locate configuration directory");
......@@ -45,7 +46,6 @@ public class LocalInstanceManagerTest {
GlobalConfiguration.loadConfiguration(configDir);
// start JobManager
ExecutionMode executionMode = ExecutionMode.LOCAL;
JobManager jm = new JobManager(executionMode);
......@@ -71,4 +71,10 @@ public class LocalInstanceManagerTest {
jm.shutdown();
}
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Test caused an error: " + e.getMessage());
}
}
}
\ No newline at end of file
......@@ -24,10 +24,8 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
......@@ -53,7 +51,7 @@ import eu.stratosphere.nephele.taskmanager.TaskManager;
import eu.stratosphere.nephele.taskmanager.runtime.RuntimeTask;
import eu.stratosphere.nephele.util.JarFileCreator;
import eu.stratosphere.nephele.util.ServerTestUtils;
import eu.stratosphere.util.StringUtils;
import eu.stratosphere.util.LogUtils;
/**
* This test is intended to cover the basic functionality of the {@link JobManager}.
......@@ -61,124 +59,47 @@ import eu.stratosphere.util.StringUtils;
public class JobManagerITCase {
static {
// initialize loggers
Logger root = Logger.getRootLogger();
root.removeAllAppenders();
PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
root.addAppender(appender);
root.setLevel(Level.WARN);
LogUtils.initializeDefaultTestConsoleLogger();
}
/**
* The name of the test directory some tests read their input from.
*/
private static final String INPUT_DIRECTORY = "testDirectory";
private static JobManagerThread jobManagerThread = null;
private static Configuration configuration;
/**
* This is an auxiliary class to run the job manager thread.
*/
private static final class JobManagerThread extends Thread {
/**
* The job manager instance.
*/
private final JobManager jobManager;
/**
* Constructs a new job manager thread.
*
* @param jobManager
* the job manager to run in this thread.
*/
private JobManagerThread(JobManager jobManager) {
this.jobManager = jobManager;
}
/**
* {@inheritDoc}
*/
@Override
public void run() {
// Run task loop
this.jobManager.runTaskLoop();
// Shut down
this.jobManager.shutdown();
}
/**
* Checks whether the encapsulated job manager is completely shut down.
*
* @return <code>true</code> if the encapsulated job manager is completely shut down, <code>false</code>
* otherwise
*/
public boolean isShutDown() {
return this.jobManager.isShutDown();
}
}
private static JobManager jobManager;
/**
* Sets up Nephele in local mode.
* Starts the JobManager in local mode.
*/
@BeforeClass
public static void startNephele() {
GlobalConfiguration.loadConfiguration(ServerTestUtils.getConfigDir());
if (jobManagerThread == null) {
// create the job manager
JobManager jobManager;
try {
jobManager = new JobManager(ExecutionMode.LOCAL);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
return;
}
GlobalConfiguration.loadConfiguration(ServerTestUtils.getConfigDir());
configuration = GlobalConfiguration.getConfiguration(new String[] { ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY });
// Start job manager thread
if (jobManager != null) {
jobManagerThread = new JobManagerThread(jobManager);
jobManagerThread.start();
}
jobManager = new JobManager(ExecutionMode.LOCAL);
// Wait for the local task manager to arrive
try {
ServerTestUtils.waitForJobManagerToBecomeReady(jobManager);
} catch (Exception e) {
fail(StringUtils.stringifyException(e));
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
fail("Could not start job manager: " + e.getMessage());
}
}
/**
* Shuts Nephele down.
* Stops the JobManager
*/
@AfterClass
public static void stopNephele() {
if (jobManagerThread != null) {
jobManagerThread.interrupt();
while (!jobManagerThread.isShutDown()) {
try {
Thread.sleep(100);
} catch (InterruptedException i) {
break;
}
}
}
jobManager.shutdown();
jobManager = null;
}
/**
......
......@@ -48,6 +48,8 @@ public abstract class AbstractTestBase {
private static final int MINIMUM_HEAP_SIZE_MB = 192;
private static final long TASK_MANAGER_MEMORY_SIZE = 96;
protected final Configuration config;
......@@ -79,7 +81,8 @@ public abstract class AbstractTestBase {
public void startCluster() throws Exception {
this.executor = new NepheleMiniCluster();
this.executor.setDefaultOverwriteFiles(true);
this.executor.setLazyMemoryAllocation(true);
this.executor.setMemorySize(TASK_MANAGER_MEMORY_SIZE);
this.executor.start();
}
......
......@@ -40,5 +40,4 @@ public class WordCountITCase extends JavaProgramTestBase {
protected void testProgram() throws Exception {
WordCount.main(new String[] { textPath, resultPath });
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册