提交 73a94ffe 编写于 作者: U uce

Add log info message with memory usage after TaskManager startup finished.

This commit also introduces a logging thread, which repeatedly logs the
memory usage. It can be configured via:
    - 'taskmanager.debug.memory.startLogThread' (default: false)
    - 'taskmanager.debug.memory.logIntervalMs' (default: 5000)
上级 b692b049
......@@ -121,11 +121,21 @@ public final class ConfigConstants {
public static final String TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = "taskmanager.net.nettyHighWaterMark";
/**
* Parameter for the interval in which the RaskManager sends the periodic heart beat messages
* Parameter for the interval in which the TaskManager sends the periodic heart beat messages
* to the JobManager (in msecs).
*/
public static final String TASK_MANAGER_HEARTBEAT_INTERVAL_KEY = "taskmanager.heartbeat-interval";
/**
* Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
*/
public static final String TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = "taskmanager.debug.memory.startLogThread";
/**
* The interval (in ms) for the log thread to log the current memory usage.
*/
public static final String TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS = "taskmanager.debug.memory.logIntervalMs";
/**
* Parameter for the maximum fan for out-of-core algorithms.
* Corresponds to the maximum fan-in for merge-sorts and the maximum fan-out
......@@ -364,6 +374,16 @@ public final class ConfigConstants {
* The default interval for TaskManager heart beats (2000 msecs).
*/
public static final int DEFAULT_TASK_MANAGER_HEARTBEAT_INTERVAL = 2000;
/**
* Flag indicating whether to start a thread, which repeatedly logs the memory usage of the JVM.
*/
public static final boolean DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD = false;
/**
* The interval (in ms) for the log thread to log the current memory usage.
*/
public static final int DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS = 5000;
/**
* The default value for the JobClient's polling interval. 2 Seconds.
......
......@@ -15,6 +15,9 @@ package eu.stratosphere.nephele.taskmanager;
import java.io.File;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
......@@ -153,44 +156,43 @@ public class TaskManager implements TaskOperationProtocol {
/** Stores whether the task manager has already been shut down. */
private volatile boolean shutdownComplete;
/**
* Constructs a new task manager, starts its IPC service and attempts to discover the job manager to
* receive an initial configuration. All parameters are obtained from the
* receive an initial configuration. All parameters are obtained from the
* {@link GlobalConfiguration}, which must be loaded prior to instantiating the task manager.
*/
public TaskManager(ExecutionMode executionMode) throws Exception {
LOG.info("TaskManager started as user " + UserGroupInformation.getCurrentUser().getShortUserName());
LOG.info("User system property: " + System.getProperty("user.name"));
LOG.info("Execution mode: " + executionMode);
// IMPORTANT! At this point, the GlobalConfiguration must have been read!
final InetSocketAddress jobManagerAddress;
{
LOG.info("Reading location of job manager from configuration");
final String address = GlobalConfiguration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
final int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
if (address == null) {
throw new Exception("Job manager address not configured in the GlobalConfiguration.");
}
// Try to convert configured address to {@link InetAddress}
try {
final InetAddress tmpAddress = InetAddress.getByName(address);
jobManagerAddress = new InetSocketAddress(tmpAddress, port);
}
catch (UnknownHostException e) {
} catch (UnknownHostException e) {
LOG.fatal("Could not resolve JobManager host name.");
throw new Exception("Could not resolve JobManager host name: " + e.getMessage(), e);
}
LOG.info("Connecting to JobManager at: " + jobManagerAddress);
}
// Create RPC connection to the JobManager
try {
this.jobManager = RPC.getProxy(JobManagerProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
......@@ -198,7 +200,7 @@ public class TaskManager implements TaskOperationProtocol {
LOG.fatal("Could not connect to the JobManager: " + e.getMessage(), e);
throw new Exception("Failed to initialize connection to JobManager: " + e.getMessage(), e);
}
int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, -1);
int dataPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, -1);
if (ipcPort == -1) {
......@@ -207,17 +209,16 @@ public class TaskManager implements TaskOperationProtocol {
if (dataPort == -1) {
dataPort = getAvailablePort();
}
// Determine our own public facing address and start the server
{
final InetAddress taskManagerAddress;
try {
taskManagerAddress = getTaskManagerAddress(jobManagerAddress);
}
catch (Exception e) {
} catch (Exception e) {
throw new RuntimeException("The TaskManager failed to determine its own network address.", e);
}
this.localInstanceConnectionInfo = new InstanceConnectionInfo(taskManagerAddress, ipcPort, dataPort);
LOG.info("TaskManager connection information:" + this.localInstanceConnectionInfo);
......@@ -230,7 +231,7 @@ public class TaskManager implements TaskOperationProtocol {
throw new Exception("Failed to start taskmanager server. " + e.getMessage(), e);
}
}
// Try to create local stub of the global input split provider
try {
this.globalInputSplitProvider = RPC.getProxy(InputSplitProviderProtocol.class, jobManagerAddress, NetUtils.getSocketFactory());
......@@ -257,31 +258,33 @@ public class TaskManager implements TaskOperationProtocol {
// Load profiler if it should be used
if (GlobalConfiguration.getBoolean(ProfilingUtils.ENABLE_PROFILING_KEY, false)) {
final String profilerClassName = GlobalConfiguration.getString(ProfilingUtils.TASKMANAGER_CLASSNAME_KEY,
"eu.stratosphere.nephele.profiling.impl.TaskManagerProfilerImpl");
"eu.stratosphere.nephele.profiling.impl.TaskManagerProfilerImpl");
this.profiler = ProfilingUtils.loadTaskManagerProfiler(profilerClassName, jobManagerAddress.getAddress(),
this.localInstanceConnectionInfo);
this.localInstanceConnectionInfo);
if (this.profiler == null) {
LOG.error("Cannot find class name for the profiler.");
} else {
}
else {
LOG.info("Profiling of jobs is enabled.");
}
} else {
}
else {
this.profiler = null;
LOG.info("Profiling of jobs is disabled.");
}
// Get the directory for storing temporary files
final String[] tmpDirPaths = GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|"+File.pathSeparator);
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(",|" + File.pathSeparator);
checkTempDirs(tmpDirPaths);
final int pageSize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
int numBuffers = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
......@@ -291,8 +294,6 @@ public class TaskManager implements TaskOperationProtocol {
ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);
// Initialize the channel manager
try {
NetworkConnectionManager networkConnectionManager = null;
......@@ -340,38 +341,70 @@ public class TaskManager implements TaskOperationProtocol {
if (memorySize > 0) {
// manually configured memory size. override the value in the hardware config
resources = HardwareDescriptionFactory.construct(resources.getNumberOfCPUCores(),
resources.getSizeOfPhysicalMemory(), memorySize * 1024L * 1024L);
resources.getSizeOfPhysicalMemory(), memorySize * 1024L * 1024L);
}
this.hardwareDescription = resources;
// Initialize the memory manager
LOG.info("Initializing memory manager with " + (resources.getSizeOfFreeMemory() >>> 20) + " megabytes of memory. " +
"Page size is " + pageSize + " bytes.");
try {
@SuppressWarnings("unused")
final boolean lazyAllocation = GlobalConfiguration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION);
ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION);
this.memoryManager = new DefaultMemoryManager(resources.getSizeOfFreeMemory(), pageSize);
} catch (Throwable t) {
LOG.fatal("Unable to initialize memory manager with " + (resources.getSizeOfFreeMemory() >>> 20)
+ " megabytes of memory.", t);
+ " megabytes of memory.", t);
throw new Exception("Unable to initialize memory manager.", t);
}
}
this.ioManager = new IOManager(tmpDirPaths);
this.heartbeatThread = new Thread() {
@Override
public void run() {
runHeartbeatLoop();
}
};
this.heartbeatThread.setName("Heartbeat Thread");
this.heartbeatThread.start();
// --------------------------------------------------------------------
// Memory Usage
// --------------------------------------------------------------------
final MemoryMXBean memoryUsageBean = ManagementFactory.getMemoryMXBean();
LOG.info(getMemoryUsageAsString(memoryUsageBean));
boolean startMemoryUsageLogThread = GlobalConfiguration.getBoolean(
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD,
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_START_LOG_THREAD);
if (startMemoryUsageLogThread && LOG.isDebugEnabled()) {
final int logIntervalMs = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS,
ConfigConstants.DEFAULT_TASK_MANAGER_DEBUG_MEMORY_USAGE_LOG_INTERVAL_MS);
new Thread(new Runnable() {
@Override
public void run() {
try {
while (!isShutDown()) {
Thread.sleep(logIntervalMs);
LOG.debug(getMemoryUsageAsString(memoryUsageBean));
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption of memory usage logger thread.");
}
}
}).start();
}
}
private int getAvailablePort() {
......@@ -502,7 +535,6 @@ public class TaskManager implements TaskOperationProtocol {
}
}
/**
* The states of address detection mechanism.
* There is only a state transition if the current state failed to determine the address.
......@@ -1030,4 +1062,24 @@ public class TaskManager implements TaskOperationProtocol {
}
}
}
private String getMemoryUsageAsString(MemoryMXBean memoryUsageBean) {
MemoryUsage heap = memoryUsageBean.getHeapMemoryUsage();
MemoryUsage nonHeap = memoryUsageBean.getNonHeapMemoryUsage();
int mb = 1 << 20;
int heapUsed = (int) (heap.getUsed() / mb);
int heapCommitted = (int) (heap.getCommitted() / mb);
int heapMax = (int) (heap.getMax() / mb);
int nonHeapUsed = (int) (nonHeap.getUsed() / mb);
int nonHeapCommitted = (int) (nonHeap.getCommitted() / mb);
int nonHeapMax = (int) (nonHeap.getMax() / mb);
String msg = String.format("Memory usage HEAP: %d/%d/%d MB, NON HEAP: %d/%d/%d MB (used/comitted/max)",
heapUsed, heapCommitted, heapMax, nonHeapUsed, nonHeapCommitted, nonHeapMax);
return msg;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册