diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java index 86a6efa8909e3fd56b7f91613c98b9c5b74699e8..76ee1f9301051f99ee37e87929cd31c6a03d0984 100644 --- a/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java +++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/minicluster/NepheleMiniCluster.java @@ -272,7 +272,7 @@ public class NepheleMiniCluster { config.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, defaultOverwriteFiles); config.setBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY, defaultAlwaysCreateDirectory); - if(memorySize < 0){ + if (memorySize < 0){ memorySize = HardwareDescriptionFactory.extractFromSystem().getSizeOfFreeMemory(); // at this time, we need to scale down the memory, because we cannot dedicate all free memory to the @@ -282,9 +282,12 @@ public class NepheleMiniCluster { GlobalConfiguration.getLong(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE); - memorySize = (long) (0.8 * (memorySize - bufferMem)); + memorySize = memorySize - bufferMem; + + // apply the fraction that makes sure memory is left to the heap for other data structures and UDFs. + memorySize = (long) (memorySize * ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION); - //convert from bytes to mega bytes + //convert from bytes to megabytes memorySize >>>= 20; } diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java index 0c5e218c758db37b04b532fb2d81ba98209fa9a5..58da6642af309fbe9e635180e3c8ebd721dff763 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/TaskManager.java @@ -334,9 +334,8 @@ public class TaskManager implements TaskOperationProtocol { throw new Exception("Failed to instantiate ChannelManager.", ioe); } + // initialize the number of slots { - HardwareDescription resources = HardwareDescriptionFactory.extractFromSystem(); - int slots = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, -1); if (slots == -1) { slots = 1; @@ -347,23 +346,34 @@ public class TaskManager implements TaskOperationProtocol { LOG.info("Creating " + slots + " task slot(s)."); } this.numberOfSlots = slots; + } + + this.hardwareDescription = HardwareDescriptionFactory.extractFromSystem(); + + // initialize the memory manager + { + // Check whether the memory size has been explicitly configured. + final long configuredMemorySize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1); + final long memorySize; - // Check whether the memory size has been explicitly configured. if so that overrides the default mechanism - // of taking as much as is mentioned in the hardware description - long memorySize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1); - - if (memorySize > 0) { - // manually configured memory size. override the value in the hardware config - resources = HardwareDescriptionFactory.construct(resources.getNumberOfCPUCores(), - resources.getSizeOfPhysicalMemory(), memorySize * 1024L * 1024L); + if (configuredMemorySize == -1) { + // no manually configured memory. take a relative fraction of the free heap space + float fraction = GlobalConfiguration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION); + memorySize = (long) (this.hardwareDescription.getSizeOfFreeMemory() * fraction); + LOG.info("Using " + fraction + " of the free heap space for managed memory."); + } + else if (configuredMemorySize <= 0) { + throw new Exception("Invalid value for Memory Manager memory size: " + configuredMemorySize); + } + else { + memorySize = configuredMemorySize << 20; } - this.hardwareDescription = resources; final int pageSize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE); // Initialize the memory manager - LOG.info("Initializing memory manager with " + (resources.getSizeOfFreeMemory() >>> 20) + " megabytes of memory. " + + LOG.info("Initializing memory manager with " + (memorySize >>> 20) + " megabytes of memory. " + "Page size is " + pageSize + " bytes."); try { @@ -371,11 +381,9 @@ public class TaskManager implements TaskOperationProtocol { final boolean lazyAllocation = GlobalConfiguration.getBoolean(ConfigConstants.TASK_MANAGER_MEMORY_LAZY_ALLOCATION_KEY, ConfigConstants.DEFAULT_TASK_MANAGER_MEMORY_LAZY_ALLOCATION); - this.memoryManager = new DefaultMemoryManager(resources.getSizeOfFreeMemory(), this.numberOfSlots, - pageSize); + this.memoryManager = new DefaultMemoryManager(memorySize, this.numberOfSlots, pageSize); } catch (Throwable t) { - LOG.fatal("Unable to initialize memory manager with " + (resources.getSizeOfFreeMemory() >>> 20) - + " megabytes of memory.", t); + LOG.fatal("Unable to initialize memory manager with " + (memorySize >>> 20) + " megabytes of memory.", t); throw new Exception("Unable to initialize memory manager.", t); } }