提交 944511cf 编写于 作者: S sewen

Cleanup and clustermanager bug fix.

上级 d847b7a3
......@@ -21,6 +21,7 @@ import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
......@@ -39,6 +40,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import eu.stratosphere.nephele.configuration.ConfigConstants;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.instance.AllocatedResource;
......@@ -100,24 +102,30 @@ import eu.stratosphere.nephele.topology.NetworkTopology;
*/
public class ClusterManager implements InstanceManager {
/** Logging facility */
// ------------------------------------------------------------------------
// Internal Constants
// ------------------------------------------------------------------------
/**
* Logging facility
*/
private static final Log LOG = LogFactory.getLog(ClusterManager.class);
/**
* Period after which we check whether hosts did not send heart-beat
* messages.
* The name of the file which contains the IP to instance type mapping.
*/
private static final int BASE_INTERVAL = 10 * 1000; // 10 sec.
private static final String SLAVE_FILE_NAME = "slaves";
/**
* The key to retrieve the index of the default instance type from the configuration.
*
*/
private static final String DEFAULT_INSTANCE_TYPE_INDEX_KEY = "jobmanager.instancemanager.cluster.defaulttype";
private final static String CONFIG_DIR_KEY = "config.dir";
/**
* The key to retrieve the clean up interval from the configuration.
* Period after which we check whether hosts did not send heart-beat
* messages.
*/
private static final String CLEANUP_INTERVAL_KEY = "jobmanager.instancemanager.cluster.cleanupinterval";
private static final int BASE_INTERVAL = 10 * 1000; // 10 sec.
/**
* Default duration after which a host is purged in case it did not send
......@@ -125,31 +133,22 @@ public class ClusterManager implements InstanceManager {
*/
private static final int DEFAULT_CLEANUP_INTERVAL = 2 * 60; // 2 min.
// ------------------------------------------------------------------------
// Fields
// ------------------------------------------------------------------------
/**
* Duration after which a host is purged in case it did not send a
* heart-beat message.
*/
private final long cleanUpInterval;
/** Object that is notified if instances become available or vanish */
private InstanceListener instanceListener;
/**
* The key prefix to retrieve the definition of the individual instance types.
* The index of the default instance type.
*/
private static final String INSTANCE_TYPE_PREFIX_KEY = "jobmanager.instancemanager.cluster.type.";
/**
* The default definition for an instance type, if no other configuration is provided.
*/
private static final String DEFAULT_INSTANCE_TYPE = "default,2,1,2048,10,10";
private static final int DEFAULT_DEFAULT_INSTANCE_TYPE_INDEX = 1;
private final int defaultInstanceTypeIndex;
private final static String CONFIG_DIR_KEY = "config.dir";
/**
* Set of hosts known to run a task manager that are thus able to execute
* tasks.
......@@ -164,11 +163,6 @@ public class ClusterManager implements InstanceManager {
*/
private final Multimap<JobID, AllocatedSlice> slicesOfJob = HashMultimap.create();
/**
* The name of the file which contains the IP to instance type mapping.
*/
private static final String SLAVE_FILE_NAME = "slaves";
/**
* Map of IP addresses to instance types.
*/
......@@ -180,7 +174,15 @@ public class ClusterManager implements InstanceManager {
*/
private final InstanceType[] availableInstanceTypes;
/**
*
*/
private final NetworkTopology networkTopology;
/**
* Object that is notified if instances become available or vanish
*/
private InstanceListener instanceListener;
/**
* Periodic task that checks whether hosts have not sent their heart-beat
......@@ -223,31 +225,40 @@ public class ClusterManager implements InstanceManager {
}
};
// ------------------------------------------------------------------------
// Constructor and set-up
// ------------------------------------------------------------------------
/**
* Constructor.
*/
public ClusterManager() {
// Load the instance type this cloud can offer
this.availableInstanceTypes = populateInstanceTypeArray();
this.availableInstanceTypes = populateInstanceTypeArray(LOG);
long tmpCleanUpInterval = (long) GlobalConfiguration.getInteger(CLEANUP_INTERVAL_KEY, DEFAULT_CLEANUP_INTERVAL) * 1000;
long tmpCleanUpInterval = (long) GlobalConfiguration.getInteger(
ConfigConstants.INSTANCE_MANAGER_CLEANUP_INTERVAL_KEY, DEFAULT_CLEANUP_INTERVAL) * 1000;
if ((tmpCleanUpInterval % BASE_INTERVAL) != 0) {
LOG.warn("Invalid clean up interval. Reverting to " + DEFAULT_CLEANUP_INTERVAL);
LOG.warn("Invalid clean up interval. Reverting to default cleanup interval of " +
DEFAULT_CLEANUP_INTERVAL + " secs.");
tmpCleanUpInterval = DEFAULT_CLEANUP_INTERVAL;
}
this.cleanUpInterval = tmpCleanUpInterval;
int tmpDefaultInstanceTypeIndex = GlobalConfiguration.getInteger(DEFAULT_INSTANCE_TYPE_INDEX_KEY,
DEFAULT_DEFAULT_INSTANCE_TYPE_INDEX);
if (tmpDefaultInstanceTypeIndex >= this.availableInstanceTypes.length) {
LOG.warn("Incorrect index to for default instance type, switching to default "
+ DEFAULT_DEFAULT_INSTANCE_TYPE_INDEX);
tmpDefaultInstanceTypeIndex = DEFAULT_DEFAULT_INSTANCE_TYPE_INDEX;
int tmpDefaultInstanceTypeIndex = GlobalConfiguration.getInteger(
ConfigConstants.INSTANCE_MANAGER_DEFAULT_INSTANCE_TYPE_INDEX_KEY,
ConfigConstants.DEFAULT_DEFAULT_INSTANCE_TYPE_INDEX);
if (tmpDefaultInstanceTypeIndex > this.availableInstanceTypes.length) {
LOG.warn("Incorrect index to for default instance type (" + tmpDefaultInstanceTypeIndex +
"), switching to default index " + ConfigConstants.DEFAULT_DEFAULT_INSTANCE_TYPE_INDEX);
tmpDefaultInstanceTypeIndex = ConfigConstants.DEFAULT_DEFAULT_INSTANCE_TYPE_INDEX;
}
this.defaultInstanceTypeIndex = tmpDefaultInstanceTypeIndex;
this.defaultInstanceTypeIndex = tmpDefaultInstanceTypeIndex - 1;
this.networkTopology = loadNetworkTopology();
......@@ -310,22 +321,25 @@ public class ClusterManager implements InstanceManager {
* @return list of available instance types sorted by price (cheapest to
* most expensive)
*/
private InstanceType[] populateInstanceTypeArray() {
public static InstanceType[] populateInstanceTypeArray(Log log) {
final List<InstanceType> instanceTypes = Lists.newArrayList();
final List<InstanceType> instanceTypes = new ArrayList<InstanceType>();
// read instance types
int count = 1;
while (true) {
final String key = INSTANCE_TYPE_PREFIX_KEY + Integer.toString(count);
final String key = ConfigConstants.INSTANCE_MANAGER_INSTANCE_TYPE_PREFIX_KEY + Integer.toString(count);
String descr = GlobalConfiguration.getString(key, null);
if (descr == null) {
if (count == 1) {
LOG
.error("Configuration does not contain at least one definition for an instance type, using default "
+ DEFAULT_INSTANCE_TYPE);
descr = DEFAULT_INSTANCE_TYPE;
if (log != null) {
LOG.error("Configuration does not contain at least one definition for an instance type, " +
"using default instance type: "+ ConfigConstants.DEFAULT_INSTANCE_TYPE);
}
descr = ConfigConstants.DEFAULT_INSTANCE_TYPE;
} else {
break;
}
......@@ -335,23 +349,32 @@ public class ClusterManager implements InstanceManager {
try {
// if successful add new instance type
instanceTypes.add(InstanceType.getTypeFromString(descr));
} catch (Throwable t) {
LOG.error("Error parsing " + key + ":" + descr, t);
}
catch (Throwable t) {
if (log != null) {
LOG.error("Error parsing " + key + ":" + descr + ". Using default using default instance type: " +
ConfigConstants.DEFAULT_INSTANCE_TYPE + " for instance type " + count + ".", t);
}
// wee need to add an instance type anyways, because otherwise a non-parsable instance description
// would cause the numbering to be wrong.
instanceTypes.add(InstanceType.getTypeFromString(ConfigConstants.DEFAULT_INSTANCE_TYPE));
}
// Increase key index
++count;
}
// sort by price
Collections.sort(instanceTypes, new Comparator<InstanceType>() {
@Override
public int compare(InstanceType o1, InstanceType o2) {
return o1.getPricePerHour() - o2.getPricePerHour();
}
});
// cannot be done since this changes the order and makes the default instance type index wrong
// sort by price
// Collections.sort(instanceTypes, new Comparator<InstanceType>() {
// @Override
// public int compare(InstanceType o1, InstanceType o2) {
// return o1.getPricePerHour() - o2.getPricePerHour();
// }
// });
return instanceTypes.toArray(new InstanceType[0]);
return instanceTypes.toArray(new InstanceType[instanceTypes.size()]);
}
/**
......@@ -428,7 +451,7 @@ public class ClusterManager implements InstanceManager {
@Override
public InstanceType getDefaultInstanceType() {
return this.availableInstanceTypes[this.defaultInstanceTypeIndex - 1];
return this.availableInstanceTypes[this.defaultInstanceTypeIndex];
}
/**
......
......@@ -16,15 +16,6 @@
<packaging>jar</packaging>
<!--<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.7</version>
<scope>test</scope>
</dependency>
</dependencies>-->
<!-- FIXME: (aa) don't know why, this doesn't seem to work (try "/nephele-common$ mvn surefire:test") -->
<!--<build>
<plugins>
......
......@@ -22,12 +22,13 @@ package eu.stratosphere.nephele.configuration;
* @author Stephan Ewen (stephan.ewen@tu-berlin.de)
*/
public final class ConfigConstants {
// ------------------------------------------------------------------------
// Configuration Keys
// Configuration Keys
// ------------------------------------------------------------------------
// -------------------------- Addresses and Ports -------------------------
/**
* The key for the config parameter defining the network address to connect to
* for communication with the job manager.
......@@ -56,24 +57,22 @@ public final class ConfigConstants {
public static final String TASK_MANAGER_USE_DISCOVERY_KEY = "taskmanager.setup.usediscovery";
/**
* The key for the config parameter defining the amount of memory available for the task manager's
* memory manager (in megabytes).
* The key for the config parameter defining the directory for temporary files.
*/
public static final String MEMORY_MANAGER_AVAILABLE_MEMORY_SIZE_KEY = "taskmanager.memory.size";
public static final String TASK_MANAGER_TMP_DIR_KEY = "taskmanager.tmp.dir";
/**
* The key for the config parameter defining the amount of memory available for the task manager's
* memory manager (as a fraction of the whole available memory).
* memory manager (in megabytes).
*/
public static final String MEMORY_MANAGER_AVAILABLE_MEMORY_FRACTION_KEY = "taskmanager.memory.fraction";
public static final String MEMORY_MANAGER_AVAILABLE_MEMORY_SIZE_KEY = "taskmanager.memory.size";
/**
* The key for the config parameter defining the directory for temporary files.
* The key for the config parameter defining flag to terminate a job at job-client shutdown.
*/
public static final String TASK_MANAGER_TMP_DIR_KEY = "taskmanager.tmp.dir";
public static final String JOBCLIENT_SHUTDOWN_TERMINATEJOB_KEY = "jobclient.shutdown.terminatejob";
// ----------------------------- Instances --------------------------------
/**
......@@ -81,8 +80,23 @@ public final class ConfigConstants {
*/
public static final String JOBMANAGER_LOCALINSTANCE_TYPE_KEY = "jobmanager.instancemanager.local.type";
/**
* The key prefix for the config parameters that define the different available instance types.
*/
public static final String INSTANCE_MANAGER_INSTANCE_TYPE_PREFIX_KEY = "jobmanager.instancemanager.cluster.type.";
/**
* The key to retrieve the index of the default instance type from the configuration.
*/
public static final String INSTANCE_MANAGER_DEFAULT_INSTANCE_TYPE_INDEX_KEY = "jobmanager.instancemanager.cluster.defaulttype";
/**
* The key to retrieve the clean up interval from the configuration.
*/
public static final String INSTANCE_MANAGER_CLEANUP_INTERVAL_KEY = "jobmanager.instancemanager.cluster.cleanupinterval";
// ------------------------------------------------------------------------
// Default Values
// Default Values
// ------------------------------------------------------------------------
/**
......@@ -108,25 +122,36 @@ public final class ConfigConstants {
/**
* The default amount of memory assigned to each task manager (in megabytes).
*/
public static final int DEFAULT_MEMORY_MANAGER_AVAILABLE_MEMORY = -1;
public static final int DEFAULT_MEMORY_MANAGER_AVAILABLE_MEMORY = 512;
/**
* The default minimal amount of memory that the memory manager does not occupy (in megabytes).
*/
public static final long DEFAULT_MEMORY_MANAGER_MIN_UNRESERVED_MEMORY = 256 * 1024 * 1024;
/**
* The default amount of memory assigned to each task manager (as a fraction of the free memory).
*/
public static final float DEFAULT_MEMORY_MANAGER_AVAILABLE_MEMORY_FRACTION = 0.7f;
/**
* The default directory for temporary files of the task manager
*/
public static final String DEFAULT_TASK_MANAGER_TMP_PATH = System.getProperty("java.io.tmpdir");
/**
* The default value for the flag to terminate a job on job-client shutdown.
*/
public static final boolean DEFAULT_JOBCLIENT_SHUTDOWN_TERMINATEJOB = true;
// ----------------------------- Instances --------------------------------
/**
* The default definition for an instance type, if no other configuration is provided.
*/
public static final String DEFAULT_INSTANCE_TYPE = "default,2,1,1024,10,10";
/**
* The default index for the default instance type.
*/
public static final int DEFAULT_DEFAULT_INSTANCE_TYPE_INDEX = 1;
// ------------------------------------------------------------------------
/**
......
......@@ -22,8 +22,12 @@ public class LocalTaskManagerThread extends Thread {
private TaskManager taskManager;
public LocalTaskManagerThread(String configDir) {
this.taskManager = new TaskManager(configDir);
try {
this.taskManager = new TaskManager(configDir);
}
catch (Exception e) {
throw new RuntimeException("Could not start local TaskManager: " + e.getMessage(), e);
}
}
@Override
......
......@@ -147,7 +147,7 @@ public class TaskManager implements TaskOperationProtocol {
* @param configDir
* the directory containing the configuration files for the task manager
*/
public TaskManager(String configDir) {
public TaskManager(String configDir) throws Exception {
// First, try to load global configuration
GlobalConfiguration.loadConfiguration(configDir);
......@@ -158,8 +158,7 @@ public class TaskManager implements TaskOperationProtocol {
try {
jobManagerAddress = DiscoveryService.getJobManagerAddress();
} catch (DiscoveryException e) {
e.printStackTrace();
System.exit(FAILURERETURNCODE);
throw new Exception("Failed to initialize discovery service. " + e.getMessage(), e);
}
} else {
......@@ -194,7 +193,7 @@ public class TaskManager implements TaskOperationProtocol {
.getSocketFactory());
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(FAILURERETURNCODE);
throw new Exception("Failed to initialize connection to JobManager. " + e.getMessage(), e);
}
this.jobManager = jobManager;
......@@ -205,7 +204,7 @@ public class TaskManager implements TaskOperationProtocol {
NetUtils.getSocketFactory());
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(FAILURERETURNCODE);
throw new Exception("Failed to initialize channel lookup protocol. " + e.getMessage(), e);
}
this.lookupService = lookupService;
......@@ -217,7 +216,7 @@ public class TaskManager implements TaskOperationProtocol {
taskManagerServer.start();
} catch (IOException e) {
LOG.error(StringUtils.stringifyException(e));
System.exit(FAILURERETURNCODE);
throw new Exception("Failed to taskmanager server. " + e.getMessage(), e);
}
this.taskManagerServer = taskManagerServer;
......@@ -226,8 +225,8 @@ public class TaskManager implements TaskOperationProtocol {
final String profilerClassName = GlobalConfiguration.getString(ProfilingUtils.TASKMANAGER_CLASSNAME_KEY,
null);
if (profilerClassName == null) {
LOG.error("Cannot find class name for the profiler");
System.exit(FAILURERETURNCODE);
LOG.error("Cannot find class name for the profiler.");
throw new Exception("Cannot find class name for the profiler.");
}
this.profiler = ProfilingUtils.loadTaskManagerProfiler(profilerClassName, jobManagerAddress.getAddress(),
this.localInstanceConnectionInfo);
......@@ -248,8 +247,7 @@ public class TaskManager implements TaskOperationProtocol {
tmpDirPath);
} catch (IOException ioe) {
LOG.error(StringUtils.stringifyException(ioe));
ioe.printStackTrace();
System.exit(FAILURERETURNCODE);
throw new Exception("Failed to instantiate Byte-buffered channel manager. " + ioe.getMessage(), ioe);
}
this.byteBufferedChannelManager = byteBufferedChannelManager;
......@@ -260,32 +258,20 @@ public class TaskManager implements TaskOperationProtocol {
this.checkpointManager = new CheckpointManager(this.byteBufferedChannelManager, tmpDirPath);
// Initialize the memory manager
long memorySize = GlobalConfiguration.getInteger(ConfigConstants.MEMORY_MANAGER_AVAILABLE_MEMORY_SIZE_KEY,
ConfigConstants.DEFAULT_MEMORY_MANAGER_AVAILABLE_MEMORY);
long memorySize = GlobalConfiguration.getInteger(ConfigConstants.MEMORY_MANAGER_AVAILABLE_MEMORY_SIZE_KEY, -1);
if (memorySize < 1) {
// get the fraction configuration
String mss = GlobalConfiguration.getString(ConfigConstants.MEMORY_MANAGER_AVAILABLE_MEMORY_FRACTION_KEY,
String.valueOf(ConfigConstants.DEFAULT_MEMORY_MANAGER_AVAILABLE_MEMORY_FRACTION));
float fract = ConfigConstants.DEFAULT_MEMORY_MANAGER_AVAILABLE_MEMORY_FRACTION;
try {
fract = Float.parseFloat(mss);
} catch (NumberFormatException nfex) {
LOG.warn("Invalid parameter for " + ConfigConstants.MEMORY_MANAGER_AVAILABLE_MEMORY_FRACTION_KEY
+ " in the configuration. Using default value of "
+ ConfigConstants.DEFAULT_MEMORY_MANAGER_AVAILABLE_MEMORY_FRACTION
+ " for the memory fraction dedicated to the MemoryManager.");
}
LOG.info("Initializing MemoryManager with a fraction of " + fract + " of the total free memory.");
this.memoryManager = DefaultMemoryManager.getWithHeapFraction(fract,
ConfigConstants.DEFAULT_MEMORY_MANAGER_MIN_UNRESERVED_MEMORY);
} else {
LOG.info("Initializing memory manager with " + memorySize + " megabytes of memory");
this.memoryManager = new DefaultMemoryManager(memorySize * 1024L * 1024L);
memorySize = ConfigConstants.DEFAULT_MEMORY_MANAGER_AVAILABLE_MEMORY;
LOG.warn("Memory manager size (" + ConfigConstants.MEMORY_MANAGER_AVAILABLE_MEMORY_SIZE_KEY +
") undefined for this task manager. Using default memory size of " +
ConfigConstants.DEFAULT_MEMORY_MANAGER_AVAILABLE_MEMORY + "MB.");
}
LOG.info("Initializing memory manager with " + memorySize + " megabytes of memory");
this.memoryManager = new DefaultMemoryManager(memorySize * 1024L * 1024L);
// Initialize the io manager
// Initialize the I/O manager
this.ioManager = new IOManager(tmpDirPath);
// Add shutdown hook for clean up tasks
......@@ -319,7 +305,15 @@ public class TaskManager implements TaskOperationProtocol {
String configDir = line.getOptionValue(configDirOpt.getOpt(), null);
// Create a new task manager object
TaskManager taskManager = new TaskManager(configDir);
TaskManager taskManager = null;
try {
taskManager = new TaskManager(configDir);
}
catch (Throwable t) {
System.err.println("Taskmanager startup failed:" + t.getMessage());
t.printStackTrace(System.err);
System.exit(FAILURERETURNCODE);
}
// Run the main I/O loop
taskManager.runIOLoop();
......
......@@ -71,7 +71,7 @@ public abstract class InputFormat<K extends Key, V extends Value> extends Stub<K
public abstract boolean nextPair(KeyValuePair<K, V> pair) throws IOException;
/**
* Method used to check if the end of the input is reached
* Method used to check if the end of the input is reached.
*
* @return returns true if the end is reached, otherwise false
* @throws IOException
......@@ -80,7 +80,7 @@ public abstract class InputFormat<K extends Key, V extends Value> extends Stub<K
public abstract boolean reachedEnd() throws IOException;
/**
* Conncets the input stream to the input format
* Conncets the input stream to the input format.
*
* @param fdis
* @param start
......@@ -94,11 +94,4 @@ public abstract class InputFormat<K extends Key, V extends Value> extends Stub<K
this.bufferSize = bufferSize;
}
// IDEA: Implement filter interface so that records can directly be skipped
// if e.g. the key does not match and that way the values don't need
// to be read
// ... should probably done by a configuration parameter not by a
// setter method
// public void setFilter(InputFilter f);
}
......@@ -35,16 +35,6 @@ import eu.stratosphere.pact.common.type.Value;
public abstract class OutputFormat<K extends Key, V extends Value> extends Stub<K, V> {
protected FSDataOutputStream stream;
/**
* Creates a KeyValue pair that can be used together with the writePair()
* method. The runtime will try to reuse the pair for several writePair()
* calls.
*
* @return
*/
// TODO: seems to be unnecessary
public abstract KeyValuePair<K, V> createPair();
/**
* Writes the pair to the underlying output stream.
*
......
......@@ -35,6 +35,7 @@ import eu.stratosphere.pact.common.type.Value;
* @param <V>
*/
public abstract class TextInputFormat<K extends Key, V extends Value> extends InputFormat<K, V> {
public static final String FORMAT_PAIR_DELIMITER = "delimiter";
private byte[] readBuffer;
......@@ -87,11 +88,15 @@ public abstract class TextInputFormat<K extends Key, V extends Value> extends In
@Override
public void configure(Configuration parameters) {
String delimString = parameters.getString(FORMAT_PAIR_DELIMITER, "\n");
if (delimString != null && delimString.length() != 1) {
if (delimString == null) {
throw new IllegalArgumentException("The delimiter not be null.");
}
else if (delimString.length() != 1) {
throw new IllegalArgumentException("The delimiter must currently be a single char string.");
}
delimiter = delimString != null ? (byte) delimString.charAt(0) : -1;
delimiter = (byte) delimString.charAt(0);
}
/**
......@@ -106,9 +111,6 @@ public abstract class TextInputFormat<K extends Key, V extends Value> extends In
this.overLimit = false;
this.end = false;
// TODO: Set delimiter
// this.delimiter = delimiter;
try {
if (start != 0) {
stream.seek(start);
......@@ -136,10 +138,6 @@ public abstract class TextInputFormat<K extends Key, V extends Value> extends In
public void close() {
wrapBuffer = null;
readBuffer = null;
// if (stream != null) {
// stream.close();
// }
}
/**
......
......@@ -45,20 +45,6 @@ public abstract class TextOutputFormat<K extends Key, V extends Value> extends O
*/
public abstract byte[] writeLine(KeyValuePair<K, V> pair);
/**
* {@inheritDoc}
*/
@Override
public KeyValuePair<K, V> createPair() {
try {
return new KeyValuePair<K, V>(ok.newInstance(), ov.newInstance());
} catch (InstantiationException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
/**
* {@inheritDoc}
*/
......
......@@ -21,11 +21,12 @@ package eu.stratosphere.pact.common.util;
* @author Stephan Ewen (stephan.ewen@tu-berlin.de)
*/
public final class PactConfigConstants {
// ------------------------------------------------------------------------
// Configuration Keys
// Configuration Keys
// ------------------------------------------------------------------------
// ------------------------------ Parallelism -----------------------------
// ---------------------------- Parallelism -------------------------------
/**
* The key for the config parameter defining the default degree of parallelization for user functions.
......@@ -89,7 +90,7 @@ public final class PactConfigConstants {
/**
* The default degree of parallelism for PACT user functions.
*/
public static final int DEFAULT_PARALLELIZATION_DEGREE = 2;
public static final int DEFAULT_PARALLELIZATION_DEGREE = 1;
/**
* The default intra-node parallelism
......@@ -105,7 +106,7 @@ public final class PactConfigConstants {
/**
* The description of the default instance type that is booked for the execution of PACT tasks.
*/
public static final String DEFAULT_INSTANCE_TYPE_DESCRIPTION = "standard,2,1,2048,10,10";
public static final String DEFAULT_INSTANCE_TYPE_DESCRIPTION = "standard,2,1,300,10,0";
// ----------------------------- Web Frontend -----------------------------
......
......@@ -25,7 +25,6 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import eu.stratosphere.nephele.configuration.ConfigConstants;
import eu.stratosphere.nephele.configuration.Configuration;
import eu.stratosphere.nephele.configuration.GlobalConfiguration;
import eu.stratosphere.nephele.instance.InstanceType;
......@@ -324,19 +323,23 @@ public class PactCompiler {
* @param estimator
* The <tt>CostEstimator</tt> to use to cost the individual operations.
*/
public PactCompiler(DataStatistics stats, CostEstimator estimator) {
public PactCompiler(DataStatistics stats, CostEstimator estimator)
{
this.statistics = stats;
this.costEstimator = estimator;
// get the instance type to schedule pact tasks on
Configuration config = GlobalConfiguration.getConfiguration();
// get the instance type to schedule pact tasks on
String instanceDescr = config.getString(PactConfigConstants.DEFAULT_INSTANCE_TYPE_KEY,
PactConfigConstants.DEFAULT_INSTANCE_TYPE_DESCRIPTION);
InstanceType type = null;
try {
type = InstanceType.getTypeFromString(instanceDescr);
} catch (IllegalArgumentException iaex) {
LOG.error("Invalid description of standard instance type in PACT configuration: " + instanceDescr, iaex);
}
catch (IllegalArgumentException iaex) {
LOG.error("Invalid description of standard instance type in PACT configuration: " + instanceDescr +
". Using default instance type " + PactConfigConstants.DEFAULT_INSTANCE_TYPE_DESCRIPTION + ".", iaex);
type = InstanceType.getTypeFromString(PactConfigConstants.DEFAULT_INSTANCE_TYPE_DESCRIPTION);
}
this.pactInstanceType = type;
......@@ -345,6 +348,8 @@ public class PactCompiler {
int defaultParallelizationDegree = config.getInteger(PactConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
PactConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
if (defaultParallelizationDegree < 1) {
LOG.error("Invalid default degree of parallelism: " + defaultParallelizationDegree +
". Using default degree of " + PactConfigConstants.DEFAULT_PARALLELIZATION_DEGREE + ".");
defaultParallelizationDegree = PactConfigConstants.DEFAULT_PARALLELIZATION_DEGREE;
}
this.defaultDegreeOfParallelism = defaultParallelizationDegree;
......@@ -353,25 +358,14 @@ public class PactCompiler {
int defaultInNodePar = config.getInteger(PactConfigConstants.DEFAULT_PARALLELIZATION_INTRA_NODE_DEGREE_KEY,
PactConfigConstants.DEFAULT_INTRA_NODE_PARALLELIZATION_DEGREE);
if (defaultInNodePar < 1) {
LOG.error("Invalid default degree of intra-node parallelism: " + defaultParallelizationDegree +
". Using default degree of " + PactConfigConstants.DEFAULT_INTRA_NODE_PARALLELIZATION_DEGREE + ".");
defaultInNodePar = PactConfigConstants.DEFAULT_INTRA_NODE_PARALLELIZATION_DEGREE;
}
this.defaultIntraNodeParallelism = defaultInNodePar;
// compute the amount of memory usable per instance
int memory = config.getInteger(ConfigConstants.MEMORY_MANAGER_AVAILABLE_MEMORY_SIZE_KEY, -1);
if (memory == -1) {
String frac = config.getString(ConfigConstants.MEMORY_MANAGER_AVAILABLE_MEMORY_FRACTION_KEY, String
.valueOf(ConfigConstants.DEFAULT_MEMORY_MANAGER_AVAILABLE_MEMORY_FRACTION));
float fraction;
try {
fraction = Float.parseFloat(frac);
} catch (NumberFormatException nfex) {
fraction = ConfigConstants.DEFAULT_MEMORY_MANAGER_AVAILABLE_MEMORY_FRACTION;
}
memory = (int) (pactInstanceType.getMemorySize() * fraction);
}
this.memoryPerInstance = memory;
// get the amount of memory usable per instance
this.memoryPerInstance = pactInstanceType.getMemorySize();
}
// ------------------------------------------------------------------------
......
......@@ -849,7 +849,7 @@ public class JobGraphGenerator implements Visitor<OptimizerNode> {
numSortBuffers = sortMem / MAX_SORT_HEAP_BUFFER_SIZE + 1;
// correct rounding loss
numSortBuffers = sortMem / (sortMem / numSortBuffers);
} else if (sortMem > 3 * 64) {
} else if (sortMem > 3 * 16) {
numSortBuffers = 3;
} else if (sortMem >= 2 * MIN_SORT_HEAP) {
numSortBuffers = 2;
......
......@@ -27,8 +27,10 @@ import eu.stratosphere.pact.common.type.KeyValuePair;
import eu.stratosphere.pact.common.type.Value;
import eu.stratosphere.pact.runtime.task.util.MatchTaskIterator;
@SuppressWarnings("unchecked")
@SuppressWarnings("rawtypes")
public class InMemoryHashMatchIterator implements MatchTaskIterator {
private Reader<? extends KeyValuePair> readerBuild;
private Reader<? extends KeyValuePair> readerProbe;
......@@ -109,6 +111,7 @@ public class InMemoryHashMatchIterator implements MatchTaskIterator {
return false;
}
@SuppressWarnings("unchecked")
private void buildHash() throws IOException, InterruptedException {
build = new HashMap<Key, Collection<Value>>();
......
......@@ -97,7 +97,8 @@ public class CombiningUnilateralSortMerger<K extends Key, V extends Value> exten
SerializationFactory<K> keySerialization, SerializationFactory<V> valueSerialization,
Comparator<K> keyComparator, Reader<KeyValuePair<K, V>> reader, float offsetArrayPerc,
AbstractTask parentTask, boolean combineLastMerge)
throws IOException, MemoryAllocationException {
throws IOException, MemoryAllocationException
{
super(memoryManager, ioManager, numSortBuffers, sizeSortBuffer, ioMemorySize, maxNumFileHandles,
keySerialization, valueSerialization, keyComparator, reader, offsetArrayPerc, parentTask);
......@@ -123,8 +124,11 @@ public class CombiningUnilateralSortMerger<K extends Key, V extends Value> exten
*/
@Override
protected ThreadBase getSpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues queues,
MemoryManager memoryManager, IOManager ioManager, int ioMemorySize, AbstractTask parentTask) {
return new SpillingThread(exceptionHandler, queues, memoryManager, ioManager, ioMemorySize, parentTask);
MemoryManager memoryManager, IOManager ioManager, int ioMemorySize, AbstractTask parentTask,
int buffersToKeepBeforeSpilling)
{
return new SpillingThread(exceptionHandler, queues, memoryManager, ioManager, ioMemorySize,
parentTask, buffersToKeepBeforeSpilling);
}
// ------------------------------------------------------------------------
......@@ -212,15 +216,20 @@ public class CombiningUnilateralSortMerger<K extends Key, V extends Value> exten
private final IOManager ioManager;
private final int ioMemorySize;
private final int buffersToKeepBeforeSpilling;
public SpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues queues,
MemoryManager memoryManager, IOManager ioManager, int ioMemorySize, AbstractTask parentTask) {
MemoryManager memoryManager, IOManager ioManager, int ioMemorySize, AbstractTask parentTask,
int buffersToKeepBeforeSpilling)
{
super(exceptionHandler, "SortMerger spilling thread", queues, parentTask);
// members
this.memoryManager = memoryManager;
this.ioManager = ioManager;
this.ioMemorySize = ioMemorySize;
this.buffersToKeepBeforeSpilling = buffersToKeepBeforeSpilling;
}
/**
......
......@@ -226,8 +226,16 @@ public class UnilateralSortMerger<K extends Key, V extends Value> implements Sor
// start the thread that handles spilling to secondary storage
spillThread = getSpillingThread(exceptionHandler, circularQueues, memoryManager, ioManager, ioMemorySize,
parentTask);
parentTask, numSortBuffers >= 3 ? numSortBuffers - 2 : 0);
startThreads();
}
/**
* Starts all the threads that are used by this sort-merger.
*/
protected void startThreads()
{
// start threads
readThread.start();
sortThread.start();
......@@ -355,14 +363,17 @@ public class UnilateralSortMerger<K extends Key, V extends Value> implements Sor
* @param ioManager
* The I/O manager
* @param ioMemorySize
* The amount of memory dedicatable to reading and writing.
* The amount of memory that is dedicated to reading and writing.
* @param parentTask
* The task at which the thread registers itself (for profiling purposes).
* @return The thread that does the spilling and pre-merging.
*/
protected ThreadBase getSpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues queues,
MemoryManager memoryManager, IOManager ioManager, int ioMemorySize, AbstractTask parentTask) {
return new SpillingThread(exceptionHandler, queues, memoryManager, ioManager, ioMemorySize, parentTask);
MemoryManager memoryManager, IOManager ioManager, int ioMemorySize, AbstractTask parentTask,
int buffersToKeepBeforeSpilling)
{
return new SpillingThread(exceptionHandler, queues, memoryManager, ioManager, ioMemorySize,
parentTask, buffersToKeepBeforeSpilling);
}
// ------------------------------------------------------------------------
......@@ -885,15 +896,20 @@ public class UnilateralSortMerger<K extends Key, V extends Value> implements Sor
private final IOManager ioManager;
private final int ioMemorySize;
private final int buffersToKeepBeforeSpilling;
public SpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues queues,
MemoryManager memoryManager, IOManager ioManager, int ioMemorySize, AbstractTask parentTask) {
MemoryManager memoryManager, IOManager ioManager, int ioMemorySize, AbstractTask parentTask,
int buffersToKeepBeforeSpilling)
{
super(exceptionHandler, "SortMerger spilling thread", queues, parentTask);
// members
this.memoryManager = memoryManager;
this.ioManager = ioManager;
this.ioMemorySize = ioMemorySize;
this.buffersToKeepBeforeSpilling = buffersToKeepBeforeSpilling;
}
/**
......@@ -908,6 +924,8 @@ public class UnilateralSortMerger<K extends Key, V extends Value> implements Sor
freeSegmentsAtShutdown(outputSegments);
CircularElement element = null;
// see whether we should keep some buffers
// loop as long as the thread is marked alive and we do not see the final
// element
......
......@@ -136,11 +136,6 @@ public class WordCount {
System.out.println(">>>>>>>>>> write (" + this + ") : " + pair.toString());
return (pair.getKey().toString() + ":" + pair.getValue().toString() + "\n").getBytes();
}
@Override
public KeyValuePair<Text, Integer> createPair() {
return new KeyValuePair<Text, Integer>(new Text(), new Integer());
}
}
public static class Mapper extends MapStub<Text, Text, Text, Integer> {
......
......@@ -94,11 +94,6 @@ public class WordCountMapReducePactMassiveTest extends TestBase {
public byte[] writeLine(KeyValuePair<Text, Integer> pair) {
return (pair.getKey().toString() + ":" + pair.getValue().toString() + "\n").getBytes();
}
@Override
public KeyValuePair<Text, Integer> createPair() {
return new KeyValuePair<Text, Integer>(new Text(), new Integer());
}
}
public static class Mapper extends MapStub<Text, Text, Text, Integer> {
......
......@@ -137,7 +137,8 @@
<configuration>
<source>1.6</source>
<target>1.6</target>
<compilerArgument>-g:none -O</compilerArgument>
<!-- High optimization, no debugging <compilerArgument>-g:none -O</compilerArgument> -->
<compilerArgument></compilerArgument>
</configuration>
</plugin>
<plugin>
......
......@@ -52,17 +52,17 @@
<property>
<key>jobmanager.instancemanager.local.type</key>
<value>standard,2,1,300,10,10</value>
<value>standard,2,1,300,10,0</value>
</property>
<property>
<key>jobmanager.instancemanager.cluster.type.1</key>
<value>standard,2,1,1024,10,10</value>
<value>standard,2,1,1024,20,0</value>
</property>
<property>
<key>jobmanager.instancemanager.cluster.type.2</key>
<value>medium,2,1,1024,20,20</value>
<value>medium,4,2,2048,40,0</value>
</property>
<property>
......
......@@ -49,7 +49,7 @@
user function on. -->
<property>
<key>pact.parallelization.default-instance-type</key>
<value>standard,2,1,300,10,10</value>
<value>standard,2,1,300,10,0</value>
</property>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册