提交 028fcf55 编写于 作者: S Stephan Ewen

[FLINK-1030] Refactor and clean up instance managers.

上级 e2fe9ee6
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.client.minicluster;
import java.lang.reflect.Method;
......@@ -30,9 +29,9 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.instance.HardwareDescriptionFactory;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
public class NepheleMiniCluster {
......@@ -296,7 +295,7 @@ public class NepheleMiniCluster {
config.setBoolean(ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY, defaultAlwaysCreateDirectory);
if (memorySize < 0){
memorySize = HardwareDescriptionFactory.extractFromSystem().getSizeOfFreeMemory();
memorySize = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
// at this time, we need to scale down the memory, because we cannot dedicate all free memory to the
// memory manager. we have to account for the buffer pools as well, and the job manager#s data structures
......
......@@ -39,7 +39,6 @@ import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.managementgraph.ManagementGraph;
import org.apache.flink.runtime.managementgraph.ManagementVertexID;
import org.apache.flink.runtime.protocols.ExtendedManagementProtocol;
import org.apache.flink.runtime.topology.NetworkTopology;
import org.apache.flink.runtime.types.IntegerRecord;
import org.junit.Assert;
import org.junit.BeforeClass;
......@@ -207,11 +206,6 @@ public class CliFrontendListCancelTest {
throw new UnsupportedOperationException();
}
@Override
public NetworkTopology getNetworkTopology(JobID jobID) throws IOException {
throw new UnsupportedOperationException();
}
@Override
public int getAvailableSlots() {
return 1;
......
......@@ -55,6 +55,12 @@ public final class ConfigConstants {
*/
public static final String JOB_MANAGER_IPC_HANDLERS_KEY = "jobmanager.rpc.numhandler";
/**
* The config parameter defining the number of seconds that a task manager heartbeat may be missing before it is
* marked as failed.
*/
public static final String JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY = "jobmanager.max-heartbeat-delay-before-failure.sec";
/**
* The config parameter defining the task manager's IPC port from the configuration.
*/
......@@ -309,14 +315,22 @@ public final class ConfigConstants {
public static final int DEFAULT_JOB_MANAGER_IPC_HANDLERS = 8;
/**
* The default network port the task manager expects incoming IPC connections.
* Default number of seconds after which a task manager is marked as failed.
*/
// 30 seconds (its enough to get to mars, should be enough to detect failure)
public static final int DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT = 30;
/**
* The default network port the task manager expects incoming IPC connections. The {@code -1} means that
* the TaskManager searches for a free port.
*/
public static final int DEFAULT_TASK_MANAGER_IPC_PORT = 6122;
public static final int DEFAULT_TASK_MANAGER_IPC_PORT = -1;
/**
* The default network port the task manager expects to receive transfer envelopes on.
* The default network port the task manager expects to receive transfer envelopes on. The {@code -1} means that
* the TaskManager searches for a free port.
*/
public static final int DEFAULT_TASK_MANAGER_DATA_PORT = 6121;
public static final int DEFAULT_TASK_MANAGER_DATA_PORT = -1;
/**
* The default directory for temporary files of the task manager.
......
......@@ -114,10 +114,11 @@ public class ConnectedComponents implements ProgramDescription {
} else {
result.print();
}
// execute program
env.execute("Connected Components Example");
// env.execute("Connected Components Example");
System.out.println(env.getExecutionPlan());
}
// *************************************************************************
......
......@@ -16,168 +16,83 @@
* limitations under the License.
*/
package org.apache.flink.runtime.instance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.topology.NetworkNode;
import org.apache.flink.runtime.topology.NetworkTopology;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Collection;
import java.util.TimerTask;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
/**
* In Nephele an instance manager maintains the set of available compute resources. It is responsible for allocating new
* compute resources,
* provisioning available compute resources to the JobManager and keeping track of the availability of the utilized
* compute resources in order
* to report unexpected resource outages.
* A simple implementation of an {@link InstanceManager}.
*/
public class DefaultInstanceManager implements InstanceManager {
// ------------------------------------------------------------------------
// Internal Constants
// ------------------------------------------------------------------------
/**
* The log object used to report debugging and error information.
*/
private static final Logger LOG = LoggerFactory.getLogger(DefaultInstanceManager.class);
/**
* Default duration after which a host is purged in case it did not send
* a heart-beat message.
*/
private static final int DEFAULT_CLEANUP_INTERVAL = 2 * 60; // 2 min.
/**
* The key to retrieve the clean up interval from the configuration.
*/
private static final String CLEANUP_INTERVAL_KEY = "instancemanager.cluster.cleanupinterval";
// ------------------------------------------------------------------------
// Fields
// ------------------------------------------------------------------------
/** Global lock */
private final Object lock = new Object();
/**
* Duration after which a host is purged in case it did not send a
* heart-beat message.
*/
private final long cleanUpInterval;
/**
* Set of hosts known to run a task manager that are thus able to execute
* tasks.
*/
private final Map<InstanceConnectionInfo, Instance> registeredHosts;
/**
* The network topology of the cluster.
*/
private final NetworkTopology networkTopology;
/**
* Object that is notified if instances become available or vanish.
*/
private InstanceListener instanceListener;
private boolean shutdown;
/**
* Periodic task that checks whether hosts have not sent their heart-beat
* messages and purges the hosts in this case.
*/
private final TimerTask cleanupStaleMachines = new TimerTask() {
@Override
public void run() {
synchronized (DefaultInstanceManager.this.lock) {
final List<Map.Entry<InstanceConnectionInfo, Instance>> hostsToRemove =
new ArrayList<Map.Entry<InstanceConnectionInfo, Instance>>();
final Map<JobID, List<AllocatedResource>> staleResources = new HashMap<JobID, List<AllocatedResource>>();
// check all hosts whether they did not send heart-beat messages.
for (Map.Entry<InstanceConnectionInfo, Instance> entry : registeredHosts.entrySet()) {
final Instance host = entry.getValue();
if (!host.isStillAlive(cleanUpInterval)) {
// this host has not sent the heart-beat messages
// -> we terminate all instances running on this host and notify the jobs
final Collection<AllocatedSlot> slots = host.removeAllocatedSlots();
for (AllocatedSlot slot : slots) {
/** Set of hosts known to run a task manager that are thus able to execute tasks (by ID). */
private final Map<InstanceID, Instance> registeredHostsById;
final JobID jobID = slot.getJobID();
List<AllocatedResource> staleResourcesOfJob = staleResources.get(jobID);
if (staleResourcesOfJob == null) {
staleResourcesOfJob = new ArrayList<AllocatedResource>();
staleResources.put(jobID, staleResourcesOfJob);
}
staleResourcesOfJob.add(new AllocatedResource(host, slot.getAllocationID()));
}
hostsToRemove.add(entry);
LOG.info("Removing TaskManager "+entry.getValue().toString()+" due to inactivity for more than "+(cleanUpInterval / 1000 )+" seconds");
}
}
/** Set of hosts known to run a task manager that are thus able to execute tasks (by connection). */
private final Map<InstanceConnectionInfo, Instance> registeredHostsByConnection;
/** Set of hosts that were present once and have died */
private final Set<InstanceConnectionInfo> deadHosts;
registeredHosts.entrySet().removeAll(hostsToRemove);
/** Duration after which a task manager is considered dead if it did not send a heart-beat message. */
private final long heartbeatTimeout;
/** The total number of task slots that the system has */
private int totalNumberOfAliveTaskSlots;
final Iterator<Map.Entry<JobID, List<AllocatedResource>>> it = staleResources.entrySet().iterator();
while (it.hasNext()) {
final Map.Entry<JobID, List<AllocatedResource>> entry = it.next();
if (instanceListener != null) {
instanceListener.allocatedResourcesDied(entry.getKey(), entry.getValue());
}
}
}
}
};
/** Flag marking the system as shut down */
private volatile boolean shutdown;
// ------------------------------------------------------------------------
// Constructor and set-up
// ------------------------------------------------------------------------
/**
* Constructor.
* Creates an instance manager, using the global configuration value for maximum interval between heartbeats
* where a task manager is still considered alive.
*/
public DefaultInstanceManager() {
this.registeredHosts = new HashMap<InstanceConnectionInfo, Instance>();
long tmpCleanUpInterval = (long) GlobalConfiguration.getInteger(CLEANUP_INTERVAL_KEY, DEFAULT_CLEANUP_INTERVAL) * 1000;
if (tmpCleanUpInterval < 10) { // Clean up interval must be at least ten seconds
LOG.warn("Invalid clean up interval. Reverting to default cleanup interval of " + DEFAULT_CLEANUP_INTERVAL
+ " secs.");
tmpCleanUpInterval = DEFAULT_CLEANUP_INTERVAL;
this(1000 * GlobalConfiguration.getLong(
ConfigConstants.JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT));
}
public DefaultInstanceManager(long heartbeatTimeout) {
this(heartbeatTimeout, heartbeatTimeout);
}
public DefaultInstanceManager(long heartbeatTimeout, long cleanupInterval) {
if (heartbeatTimeout <= 0 || cleanupInterval <= 0) {
throw new IllegalArgumentException("Heartbeat timeout and cleanup interval must be positive.");
}
this.registeredHostsById = new HashMap<InstanceID, Instance>();
this.registeredHostsByConnection = new HashMap<InstanceConnectionInfo, Instance>();
this.deadHosts = new HashSet<InstanceConnectionInfo>();
this.heartbeatTimeout = heartbeatTimeout;
this.cleanUpInterval = tmpCleanUpInterval;
this.networkTopology = NetworkTopology.createEmptyTopology();
// look every BASEINTERVAL milliseconds for crashed hosts
final boolean runTimerAsDaemon = true;
new Timer(runTimerAsDaemon).schedule(cleanupStaleMachines, 1000, 1000);
new Timer(true).schedule(cleanupStaleMachines, cleanupInterval, cleanupInterval);
}
@Override
......@@ -186,220 +101,152 @@ public class DefaultInstanceManager implements InstanceManager {
if (this.shutdown) {
return;
}
this.shutdown = true;
this.cleanupStaleMachines.cancel();
Iterator<Instance> it = this.registeredHosts.values().iterator();
while (it.hasNext()) {
it.next().destroyProxies();
for (Instance i : this.registeredHostsById.values()) {
i.destroy();
}
this.registeredHosts.clear();
this.shutdown = true;
this.registeredHostsById.clear();
this.registeredHostsByConnection.clear();
this.deadHosts.clear();
this.totalNumberOfAliveTaskSlots = 0;
}
}
@Override
public void releaseAllocatedResource(AllocatedResource allocatedResource) throws InstanceException
{
synchronized (this.lock) {
// release the instance from the host
final Instance clusterInstance = allocatedResource.getInstance();
clusterInstance.releaseSlot(allocatedResource.getAllocationID());
public boolean reportHeartBeat(InstanceID instanceId) {
if (instanceId == null) {
throw new IllegalArgumentException("InstanceID may not be null.");
}
}
/**
* Creates a new {@link Instance} object to manage instances that can
* be executed on that host.
*
* @param instanceConnectionInfo
* the connection information for the instance
* @param hardwareDescription
* the hardware description provided by the new instance
* @param numberOfSlots
* number of slots available on the instance
* @return a new {@link Instance} object or <code>null</code> if the cluster instance could not be created
*/
private Instance createNewHost(final InstanceConnectionInfo instanceConnectionInfo,
final HardwareDescription hardwareDescription, int numberOfSlots) {
// Try to match new host with a stub host from the existing topology
String instanceName = instanceConnectionInfo.hostname();
NetworkNode parentNode = this.networkTopology.getRootNode();
NetworkNode currentStubNode = null;
// Try to match new host using the host name
while (true) {
currentStubNode = this.networkTopology.getNodeByName(instanceName);
if (currentStubNode != null) {
break;
}
final int pos = instanceName.lastIndexOf('.');
if (pos == -1) {
break;
}
/*
* If host name is reported as FQDN, iterative remove parts
* of the domain name until a match occurs or no more dots
* can be found in the host name.
*/
instanceName = instanceName.substring(0, pos);
}
// Try to match the new host using the IP address
if (currentStubNode == null) {
instanceName = instanceConnectionInfo.address().toString();
instanceName = instanceName.replaceAll("/", ""); // Remove any / characters
currentStubNode = this.networkTopology.getNodeByName(instanceName);
}
if (currentStubNode != null) {
/*
* The instance name will be the same as the one of the stub node. That way
* the stub now will be removed from the network topology and replaced be
* the new node.
*/
if (currentStubNode.getParentNode() != null) {
parentNode = currentStubNode.getParentNode();
}
// Remove the stub node from the tree
currentStubNode.remove();
}
LOG.info("Creating instance for " + instanceConnectionInfo + ", parent is "
+ parentNode.getName());
final Instance host = new Instance(instanceConnectionInfo, parentNode,
this.networkTopology, hardwareDescription, numberOfSlots);
return host;
}
@Override
public void reportHeartBeat(InstanceConnectionInfo instanceConnectionInfo) {
synchronized (this.lock) {
Instance host = registeredHosts.get(instanceConnectionInfo);
if(host == null){
LOG.error("Task manager with connection info " + instanceConnectionInfo + " has not been registered.");
return;
if (this.shutdown) {
throw new IllegalStateException("InstanceManager is shut down.");
}
Instance host = registeredHostsById.get(instanceId);
if (host == null){
if (LOG.isDebugEnabled()) {
LOG.debug("Received hearbeat from unknown TaskManager with instance ID " + instanceId.toString() +
" Possibly TaskManager was maked as dead (timed-out) earlier. " +
"Reporting back that task manager is no longer known.");
}
return false;
}
host.reportHeartBeat();
return true;
}
}
@Override
public void registerTaskManager(InstanceConnectionInfo instanceConnectionInfo,
HardwareDescription hardwareDescription, int numberOfSlots){
public InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription resources, int numberOfSlots){
synchronized(this.lock){
if(registeredHosts.containsKey(instanceConnectionInfo)){
LOG.error("Task manager with connection info " + instanceConnectionInfo + " has already been " +
"registered.");
return;
if (this.shutdown) {
throw new IllegalStateException("InstanceManager is shut down.");
}
Instance host = createNewHost(instanceConnectionInfo, hardwareDescription, numberOfSlots);
if(host == null){
LOG.error("Could not create a new host object for register task manager for connection info " +
instanceConnectionInfo);
return;
Instance prior = registeredHostsByConnection.get(instanceConnectionInfo);
if (prior != null) {
LOG.error("Registration attempt from TaskManager with connection info " + instanceConnectionInfo +
". This connection is already registered under ID " + prior.getId());
return null;
}
this.registeredHosts.put(instanceConnectionInfo, host);
LOG.info("New number of registered hosts is " + this.registeredHosts.size());
host.reportHeartBeat();
}
}
@Override
public void requestInstance(JobID jobID, Configuration conf, int requiredSlots)
throws InstanceException
{
synchronized(this.lock) {
Iterator<Instance> clusterIterator = this.registeredHosts.values().iterator();
Instance instance = null;
List<AllocatedResource> allocatedResources = new ArrayList<AllocatedResource>();
int allocatedSlots = 0;
while(clusterIterator.hasNext()) {
instance = clusterIterator.next();
while(instance.getNumberOfAvailableSlots() >0 && allocatedSlots < requiredSlots){
AllocatedResource resource = instance.allocateSlot(jobID);
allocatedResources.add(resource);
allocatedSlots++;
}
boolean wasDead = this.deadHosts.remove(instanceConnectionInfo);
if (wasDead) {
LOG.info("Registering TaskManager with connection info " + instanceConnectionInfo +
" which was marked as dead earlier because of a heart-beat timeout.");
}
if(allocatedSlots < requiredSlots){
throw new InstanceException("Cannot allocate the required number of slots: " + requiredSlots + ".");
InstanceID id = null;
do {
id = new InstanceID();
} while (registeredHostsById.containsKey(id));
Instance host = new Instance(instanceConnectionInfo, id, resources, numberOfSlots);
registeredHostsById.put(id, host);
registeredHostsByConnection.put(instanceConnectionInfo, host);
totalNumberOfAliveTaskSlots += numberOfSlots;
if (LOG.isInfoEnabled()) {
LOG.info(String.format("Registered TaskManager at %s as %s. Current number of registered hosts is %d.",
instanceConnectionInfo, id, registeredHostsById.size()));
}
if (this.instanceListener != null) {
final InstanceNotifier instanceNotifier = new InstanceNotifier(
this.instanceListener, jobID, allocatedResources);
instanceNotifier.start();
}
host.reportHeartBeat();
return id;
}
}
@Override
public NetworkTopology getNetworkTopology(JobID jobID) {
return this.networkTopology;
public int getNumberOfRegisteredTaskManagers() {
return this.registeredHostsById.size();
}
@Override
public void setInstanceListener(InstanceListener instanceListener) {
synchronized (this.lock) {
this.instanceListener = instanceListener;
}
public int getTotalNumberOfSlots() {
return this.totalNumberOfAliveTaskSlots;
}
@Override
public Instance getInstanceByName(String name) {
if (name == null) {
throw new IllegalArgumentException("Argument name must not be null");
}
synchronized (this.lock) {
final Iterator<Instance> it = this.registeredHosts.values().iterator();
while (it.hasNext()) {
final Instance instance = it.next();
if (name.equals(instance.getName())) {
return instance;
}
}
}
return null;
public Map<InstanceID, Instance> getAllRegisteredInstances() {
return this.registeredHostsById;
}
// --------------------------------------------------------------------------------------------
/**
* Periodic task that checks whether hosts have not sent their heart-beat
* messages and purges the hosts in this case.
*/
private final TimerTask cleanupStaleMachines = new TimerTask() {
@Override
public int getNumberOfTaskManagers() {
return this.registeredHosts.size();
}
@Override
public void run() {
@Override
public int getNumberOfSlots() {
int slots = 0;
final long now = System.currentTimeMillis();
final long timeout = DefaultInstanceManager.this.heartbeatTimeout;
synchronized (DefaultInstanceManager.this.lock) {
if (DefaultInstanceManager.this.shutdown) {
return;
}
for(Instance instance: registeredHosts.values()){
slots += instance.getNumberOfSlots();
final Iterator<Map.Entry<InstanceID, Instance>> entries = registeredHostsById.entrySet().iterator();
// check all hosts whether they did not send heart-beat messages.
while (entries.hasNext()) {
final Map.Entry<InstanceID, Instance> entry = entries.next();
final Instance host = entry.getValue();
if (!host.isStillAlive(now, timeout)) {
// remove from the living
entries.remove();
registeredHostsByConnection.remove(host.getInstanceConnectionInfo());
// add to the dead
deadHosts.add(host.getInstanceConnectionInfo());
host.markDied();
totalNumberOfAliveTaskSlots -= host.getNumberOfSlots();
LOG.info(String.format("TaskManager %s at %s did not report a heartbeat for %d msecs - marking as dead. Current number of registered hosts is %d.",
host.getId(), host.getInstanceConnectionInfo(), heartbeatTimeout, registeredHostsById.size()));
}
}
}
}
return slots;
}
@Override
public Map<InstanceConnectionInfo, Instance> getInstances() {
return this.registeredHosts;
}
};
}
......@@ -16,15 +16,245 @@
* limitations under the License.
*/
package org.apache.flink.runtime.instance;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.util.OperatingSystem;
/**
* Convenience class to extract hardware specifics of the computer executing this class
*/
public class Hardware {
private static final Log LOG = LogFactory.getLog(Hardware.class);
private static final String LINUX_MEMORY_INFO_PATH = "/proc/meminfo";
private static final Pattern LINUX_MEMORY_REGEX = Pattern.compile("^MemTotal:\\s*(\\d+)\\s+kB$");
/**
* Gets the number of CPU cores (hardware contexts) that the JVM has access to.
*
* @return The number of CPU cores.
*/
public static int getNumberCPUCores() {
return Runtime.getRuntime().availableProcessors();
}
/**
* Returns the size of the physical memory in bytes.
*
* @return the size of the physical memory in bytes or <code>-1</code> if
* the size could not be determined
*/
public static long getSizeOfPhysicalMemory() {
switch (OperatingSystem.getCurrentOperatingSystem()) {
case LINUX:
return getSizeOfPhysicalMemoryForLinux();
case WINDOWS:
return getSizeOfPhysicalMemoryForWindows();
case MAC_OS:
return getSizeOfPhysicalMemoryForMac();
case FREE_BSD:
return getSizeOfPhysicalMemoryForFreeBSD();
case UNKNOWN:
LOG.error("Cannot determine size of physical memory for unknown operating system");
return -1;
default:
LOG.error("Unrecognized OS");
return -1;
}
}
/**
* Returns the size of the physical memory in bytes on a Linux-based
* operating system.
*
* @return the size of the physical memory in bytes or <code>-1</code> if
* the size could not be determined
*/
@SuppressWarnings("resource")
private static long getSizeOfPhysicalMemoryForLinux() {
BufferedReader lineReader = null;
try {
lineReader = new BufferedReader(new FileReader(LINUX_MEMORY_INFO_PATH));
String line = null;
while ((line = lineReader.readLine()) != null) {
Matcher matcher = LINUX_MEMORY_REGEX.matcher(line);
if (matcher.matches()) {
String totalMemory = matcher.group(1);
return Long.parseLong(totalMemory) * 1024L; // Convert from kilobyte to byte
}
}
// expected line did not come
LOG.error("Cannot determine the size of the physical memory for Linux host (using '/proc/meminfo'). Unexpected format.");
return -1;
}
catch (NumberFormatException e) {
LOG.error("Cannot determine the size of the physical memory for Linux host (using '/proc/meminfo'). Unexpected format.");
return -1;
}
catch (Throwable t) {
LOG.error("Cannot determine the size of the physical memory for Linux host (using '/proc/meminfo'): " + t.getMessage(), t);
return -1;
}
finally {
// Make sure we always close the file handle
try {
if (lineReader != null) {
lineReader.close();
}
} catch (Throwable t) {}
}
}
/**
* Returns the size of the physical memory in bytes on a Mac OS-based
* operating system
*
* @return the size of the physical memory in bytes or <code>-1</code> if
* the size could not be determined
*/
private static long getSizeOfPhysicalMemoryForMac() {
BufferedReader bi = null;
try {
Process proc = Runtime.getRuntime().exec("sysctl hw.memsize");
bi = new BufferedReader(
new InputStreamReader(proc.getInputStream()));
String line;
while ((line = bi.readLine()) != null) {
if (line.startsWith("hw.memsize")) {
long memsize = Long.parseLong(line.split(":")[1].trim());
bi.close();
proc.destroy();
return memsize;
}
}
} catch (Throwable t) {
LOG.error("Cannot determine physical memory of machine for MacOS host: " + t.getMessage(), t);
return -1;
} finally {
if (bi != null) {
try {
bi.close();
} catch (IOException ioe) {
}
}
}
return -1;
}
/**
* Returns the size of the physical memory in bytes on FreeBSD.
*
* @return the size of the physical memory in bytes or <code>-1</code> if
* the size could not be determined
*/
private static long getSizeOfPhysicalMemoryForFreeBSD() {
BufferedReader bi = null;
try {
Process proc = Runtime.getRuntime().exec("sysctl hw.physmem");
bi = new BufferedReader(new InputStreamReader(proc.getInputStream()));
String line;
while ((line = bi.readLine()) != null) {
if (line.startsWith("hw.physmem")) {
long memsize = Long.parseLong(line.split(":")[1].trim());
bi.close();
proc.destroy();
return memsize;
}
}
LOG.error("Cannot determine the size of the physical memory for FreeBSD host (using 'sysctl hw.physmem').");
return -1;
}
catch (Throwable t) {
LOG.error("Cannot determine the size of the physical memory for FreeBSD host (using 'sysctl hw.physmem'): " + t.getMessage(), t);
return -1;
}
finally {
if (bi != null) {
try {
bi.close();
} catch (IOException ioe) {
}
}
}
}
/**
* Returns the size of the physical memory in bytes on Windows.
*
* @return the size of the physical memory in bytes or <code>-1</code> if
* the size could not be determined
*/
private static long getSizeOfPhysicalMemoryForWindows() {
BufferedReader bi = null;
try {
Process proc = Runtime.getRuntime().exec("wmic memorychip get capacity");
bi = new BufferedReader(new InputStreamReader(proc.getInputStream()));
String line = bi.readLine();
if (line == null) {
return -1L;
}
if (!line.startsWith("Capacity")) {
return -1L;
}
long sizeOfPhyiscalMemory = 0L;
while ((line = bi.readLine()) != null) {
if (line.isEmpty()) {
continue;
}
line = line.replaceAll(" ", "");
sizeOfPhyiscalMemory += Long.parseLong(line);
}
return sizeOfPhyiscalMemory;
}
catch (Throwable t) {
LOG.error("Cannot determine the size of the physical memory for Windows host (using 'wmic memorychip'): " + t.getMessage(), t);
return -1L;
}
finally {
if (bi != null) {
try {
bi.close();
} catch (Throwable t) {}
}
}
}
// --------------------------------------------------------------------------------------------
private Hardware() {}
}
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.runtime.instance;
import java.io.IOException;
......@@ -26,67 +25,43 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
/**
* A hardware description reflects the hardware environment which is actually present on the task manager's compute
* nodes. Unlike the {@link InstanceType} the hardware description is determined by the compute node itself and not
* loaded from a predefined configuration profile. In particular, the hardware description includes the size of free
* memory which is actually available to the JVM and can be used to allocate large memory portions.
* <p>
* This class is thread-safe.
*
* A hardware description describes the resources available to a task manager.
*/
public final class HardwareDescription implements IOReadableWritable {
public final class HardwareDescription implements IOReadableWritable, java.io.Serializable {
/**
* The number of CPU cores available to the JVM on the compute node.
*/
private int numberOfCPUCores = 0;
private static final long serialVersionUID = 3380016608300325361L;
/**
* The size of physical memory in bytes available on the compute node.
*/
private long sizeOfPhysicalMemory = 0;
/** The number of CPU cores available to the JVM on the compute node. */
private int numberOfCPUCores;
/**
* The size of free memory in bytes available to the JVM on the compute node.
*/
private long sizeOfFreeMemory = 0;
/** The size of physical memory in bytes available on the compute node. */
private long sizeOfPhysicalMemory;
/** The size of the JVM heap memory */
private long sizeOfJvmHeap;
/** The size of the memory managed by the system for caching, hashing, sorting, ... */
private long sizeOfManagedMemory;
/**
* Public default constructor used for serialization process.
*/
public HardwareDescription() {
}
public HardwareDescription() {}
/**
* Constructs a new hardware description object.
*
* @param numberOfCPUCores
* the number of CPU cores available to the JVM on the compute node
* @param sizeOfPhysicalMemory
* the size of physical memory in bytes available on the compute node
* @param sizeOfFreeMemory
* the size of free memory in bytes available to the JVM on the compute node
* @param numberOfCPUCores The number of CPU cores available to the JVM on the compute node.
* @param sizeOfPhysicalMemory The size of physical memory in bytes available on the compute node.
* @param sizeOfJvmHeap The size of the JVM heap memory.
* @param sizeOfManagedMemory The size of the memory managed by the system for caching, hashing, sorting, ...
*/
HardwareDescription(final int numberOfCPUCores, final long sizeOfPhysicalMemory, final long sizeOfFreeMemory) {
public HardwareDescription(int numberOfCPUCores, long sizeOfPhysicalMemory, long sizeOfJvmHeap, long sizeOfManagedMemory) {
this.numberOfCPUCores = numberOfCPUCores;
this.sizeOfPhysicalMemory = sizeOfPhysicalMemory;
this.sizeOfFreeMemory = sizeOfFreeMemory;
}
@Override
public void write(final DataOutputView out) throws IOException {
out.writeInt(this.numberOfCPUCores);
out.writeLong(this.sizeOfPhysicalMemory);
out.writeLong(this.sizeOfFreeMemory);
}
@Override
public void read(final DataInputView in) throws IOException {
this.numberOfCPUCores = in.readInt();
this.sizeOfPhysicalMemory = in.readLong();
this.sizeOfFreeMemory = in.readLong();
this.sizeOfJvmHeap = sizeOfJvmHeap;
this.sizeOfManagedMemory = sizeOfManagedMemory;
}
/**
......@@ -108,11 +83,52 @@ public final class HardwareDescription implements IOReadableWritable {
}
/**
* Returns the size of free memory in bytes available to the JVM on the compute node.
* Returns the size of the JVM heap memory
*
* @return the size of free memory in bytes available to the JVM on the compute node
* @return The size of the JVM heap memory
*/
public long getSizeOfFreeMemory() {
return this.sizeOfFreeMemory;
public long getSizeOfJvmHeap() {
return this.sizeOfJvmHeap;
}
/**
* Returns the size of the memory managed by the system for caching, hashing, sorting, ...
*
* @return The size of the memory managed by the system.
*/
public long getSizeOfManagedMemory() {
return this.sizeOfManagedMemory;
}
// --------------------------------------------------------------------------------------------
// Serialization
// --------------------------------------------------------------------------------------------
@Override
public void write(DataOutputView out) throws IOException {
out.writeInt(this.numberOfCPUCores);
out.writeLong(this.sizeOfPhysicalMemory);
out.writeLong(this.sizeOfJvmHeap);
out.writeLong(this.sizeOfManagedMemory);
}
@Override
public void read(DataInputView in) throws IOException {
this.numberOfCPUCores = in.readInt();
this.sizeOfPhysicalMemory = in.readLong();
this.sizeOfJvmHeap = in.readLong();
this.sizeOfManagedMemory = in.readLong();
}
// --------------------------------------------------------------------------------------------
// Factory
// --------------------------------------------------------------------------------------------
public static HardwareDescription extractFromSystem(long managedMemory) {
final int numberOfCPUCores = Hardware.getNumberCPUCores();
final long sizeOfJvmHeap = Runtime.getRuntime().maxMemory();
final long sizeOfPhysicalMemory = Hardware.getSizeOfPhysicalMemory();
return new HardwareDescription(numberOfCPUCores, sizeOfPhysicalMemory, sizeOfJvmHeap, managedMemory);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.instance;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.util.OperatingSystem;
/**
* A factory to construct {@link HardwareDescription} objects. In particular,
* the factory can automatically generate a {@link HardwareDescription} object
* from the system it is executed on.
* <p>
* This class is thread-safe.
*/
public class HardwareDescriptionFactory {
/**
* The log object used to report errors.
*/
private static final Logger LOG = LoggerFactory.getLogger(HardwareDescriptionFactory.class);
/**
* The path to the interface to extract memory information under Linux.
*/
private static final String LINUX_MEMORY_INFO_PATH = "/proc/meminfo";
/**
* The regular expression used to extract the size of the physical memory
* under Linux.
*/
private static final Pattern LINUX_MEMORY_REGEX = Pattern.compile("^MemTotal:\\s*(\\d+)\\s+kB$");
/**
* Private constructor, so class cannot be instantiated.
*/
private HardwareDescriptionFactory() {}
/**
* Extracts a hardware description object from the system.
*
* @return the hardware description object or <code>null</code> if at least
* one value for the hardware description cannot be determined
*/
public static HardwareDescription extractFromSystem() {
int numberOfCPUCores = Runtime.getRuntime().availableProcessors();
long sizeOfPhysicalMemory = getSizeOfPhysicalMemory();
if (sizeOfPhysicalMemory < 0) {
sizeOfPhysicalMemory = 1;
}
long sizeOfFreeMemory = getSizeOfFreeMemory();
return new HardwareDescription(numberOfCPUCores, sizeOfPhysicalMemory, sizeOfFreeMemory);
}
/**
* Constructs a new hardware description object.
*
* @param numberOfCPUCores
* the number of CPU cores available to the JVM on the compute
* node
* @param sizeOfPhysicalMemory
* the size of physical memory in bytes available on the compute
* node
* @param sizeOfFreeMemory
* the size of free memory in bytes available to the JVM on the
* compute node
* @return the hardware description object
*/
public static HardwareDescription construct(int numberOfCPUCores,long sizeOfPhysicalMemory, long sizeOfFreeMemory) {
return new HardwareDescription(numberOfCPUCores, sizeOfPhysicalMemory, sizeOfFreeMemory);
}
/**
* Returns the size of free memory in bytes available to the JVM.
*
* @return the size of the free memory in bytes available to the JVM or <code>-1</code> if the size cannot be
* determined
*/
private static long getSizeOfFreeMemory() {
Runtime r = Runtime.getRuntime();
long available = r.maxMemory() - r.totalMemory() + r.freeMemory();
return available;
}
/**
* Returns the size of the physical memory in bytes.
*
* @return the size of the physical memory in bytes or <code>-1</code> if
* the size could not be determined
*/
private static long getSizeOfPhysicalMemory() {
switch (OperatingSystem.getCurrentOperatingSystem()) {
case LINUX:
return getSizeOfPhysicalMemoryForLinux();
case WINDOWS:
return getSizeOfPhysicalMemoryForWindows();
case MAC_OS:
return getSizeOfPhysicalMemoryForMac();
case FREE_BSD:
return getSizeOfPhysicalMemoryForFreeBSD();
case UNKNOWN:
LOG.error("Cannot determine size of physical memory for unknown operating system");
return -1;
default:
LOG.error("Unrecognized OS");
return -1;
}
}
/**
* Returns the size of the physical memory in bytes on a Linux-based
* operating system.
*
* @return the size of the physical memory in bytes or <code>-1</code> if
* the size could not be determined
*/
@SuppressWarnings("resource")
private static long getSizeOfPhysicalMemoryForLinux() {
BufferedReader lineReader = null;
try {
lineReader = new BufferedReader(new FileReader(LINUX_MEMORY_INFO_PATH));
String line = null;
while ((line = lineReader.readLine()) != null) {
Matcher matcher = LINUX_MEMORY_REGEX.matcher(line);
if (matcher.matches()) {
String totalMemory = matcher.group(1);
return Long.parseLong(totalMemory) * 1024L; // Convert from kilobyte to byte
}
}
// expected line did not come
LOG.error("Cannot determine the size of the physical memory using '/proc/meminfo'. Unexpected format.");
return -1;
}
catch (NumberFormatException e) {
LOG.error("Cannot determine the size of the physical memory using '/proc/meminfo'. Unexpected format.");
return -1;
}
catch (IOException e) {
LOG.error("Cannot determine the size of the physical memory using '/proc/meminfo': " + e.getMessage(), e);
return -1;
}
finally {
// Make sure we always close the file handle
try {
if (lineReader != null) {
lineReader.close();
}
} catch (Throwable t) {}
}
}
/**
* Returns the size of the physical memory in bytes on a Mac OS-based
* operating system
*
* @return the size of the physical memory in bytes or <code>-1</code> if
* the size could not be determined
*/
private static long getSizeOfPhysicalMemoryForMac() {
BufferedReader bi = null;
try {
Process proc = Runtime.getRuntime().exec("sysctl hw.memsize");
bi = new BufferedReader(
new InputStreamReader(proc.getInputStream()));
String line;
while ((line = bi.readLine()) != null) {
if (line.startsWith("hw.memsize")) {
long memsize = Long.parseLong(line.split(":")[1].trim());
bi.close();
proc.destroy();
return memsize;
}
}
} catch (Exception e) {
LOG.error("Exception while retrieving size of physical of memory on mac.", e);
return -1;
} finally {
if (bi != null) {
try {
bi.close();
} catch (IOException ioe) {
}
}
}
return -1;
}
/**
* Returns the size of the physical memory in bytes on FreeBSD.
*
* @return the size of the physical memory in bytes or <code>-1</code> if
* the size could not be determined
*/
private static long getSizeOfPhysicalMemoryForFreeBSD() {
BufferedReader bi = null;
try {
Process proc = Runtime.getRuntime().exec("sysctl hw.physmem");
bi = new BufferedReader(new InputStreamReader(proc.getInputStream()));
String line;
while ((line = bi.readLine()) != null) {
if (line.startsWith("hw.physmem")) {
long memsize = Long.parseLong(line.split(":")[1].trim());
bi.close();
proc.destroy();
return memsize;
}
}
LOG.error("Cannot determine the size of the physical memory using 'sysctl hw.physmem'.");
return -1;
}
catch (Exception e) {
LOG.error("Cannot determine the size of the physical memory using 'sysctl hw.physmem': " + e.getMessage(), e);
return -1;
}
finally {
if (bi != null) {
try {
bi.close();
} catch (IOException ioe) {
}
}
}
}
/**
* Returns the size of the physical memory in bytes on Windows.
*
* @return the size of the physical memory in bytes or <code>-1</code> if
* the size could not be determined
*/
private static long getSizeOfPhysicalMemoryForWindows() {
BufferedReader bi = null;
try {
Process proc = Runtime.getRuntime().exec("wmic memorychip get capacity");
bi = new BufferedReader(new InputStreamReader(proc.getInputStream()));
String line = bi.readLine();
if (line == null) {
return -1L;
}
if (!line.startsWith("Capacity")) {
return -1L;
}
long sizeOfPhyiscalMemory = 0L;
while ((line = bi.readLine()) != null) {
if (line.isEmpty()) {
continue;
}
line = line.replaceAll(" ", "");
sizeOfPhyiscalMemory += Long.parseLong(line);
}
return sizeOfPhyiscalMemory;
}
catch (Exception e) {
LOG.error("Cannot determine the size of the physical memory using 'wmic memorychip': " + e.getMessage(), e);
return -1L;
}
finally {
if (bi != null) {
try {
bi.close();
} catch (Throwable t) {}
}
}
}
}
......@@ -42,27 +42,22 @@ import org.apache.flink.runtime.protocols.TaskOperationProtocol;
import org.apache.flink.runtime.taskmanager.TaskCancelResult;
import org.apache.flink.runtime.taskmanager.TaskKillResult;
import org.apache.flink.runtime.taskmanager.TaskSubmissionResult;
import org.apache.flink.runtime.topology.NetworkNode;
import org.apache.flink.runtime.topology.NetworkTopology;
/**
* An instance represents a resource a {@link org.apache.flink.runtime.taskmanager.TaskManager} runs on.
*
*/
public class Instance extends NetworkNode {
/**
* The connection info identifying the instance.
*/
public class Instance {
/** The connection info to connect to the task manager represented by this instance. */
private final InstanceConnectionInfo instanceConnectionInfo;
/**
* The hardware description as reported by the instance itself.
*/
private final HardwareDescription hardwareDescription;
/**
* Number of slots available on the node
*/
/** A description of the resources of the task manager */
private final HardwareDescription resources;
/** The ID identifying the instance. */
private final InstanceID instanceId;
/** The number of task slots available on the node */
private final int numberOfSlots;
/**
......@@ -78,26 +73,20 @@ public class Instance extends NetworkNode {
/**
* Time when last heat beat has been received from the task manager running on this instance.
*/
private long lastReceivedHeartBeat = System.currentTimeMillis();
private volatile long lastReceivedHeartBeat = System.currentTimeMillis();
/**
* Constructs an abstract instance object.
*
* @param instanceConnectionInfo
* the connection info identifying the instance
* @param parentNode
* the parent node in the network topology
* @param networkTopology
* the network topology this node is a part of
* @param hardwareDescription
* the hardware description provided by the instance itself
* @param instanceConnectionInfo The connection info under which to reach the TaskManager instance.
* @param id The id under which the instance is registered.
* @param resources The resources available on the machine.
* @param numberOfSlots The number of task slots offered by this instance.
*/
public Instance(final InstanceConnectionInfo instanceConnectionInfo,
final NetworkNode parentNode, final NetworkTopology networkTopology,
final HardwareDescription hardwareDescription, int numberOfSlots) {
super((instanceConnectionInfo == null) ? null : instanceConnectionInfo.toString(), parentNode, networkTopology);
public Instance(InstanceConnectionInfo instanceConnectionInfo, InstanceID id, HardwareDescription resources, int numberOfSlots) {
this.instanceConnectionInfo = instanceConnectionInfo;
this.hardwareDescription = hardwareDescription;
this.instanceId = id;
this.resources = resources;
this.numberOfSlots = numberOfSlots;
}
......@@ -140,15 +129,6 @@ public class Instance extends NetworkNode {
return this.instanceConnectionInfo;
}
/**
* Returns the instance's hardware description as reported by the instance itself.
*
* @return the instance's hardware description
*/
public HardwareDescription getHardwareDescription() {
return this.hardwareDescription;
}
/**
* Checks if all the libraries required to run the job with the given
* job ID are available on this instance. Any libary that is missing
......@@ -193,9 +173,7 @@ public class Instance extends NetworkNode {
* @throws IOException
* thrown if an error occurs while transmitting the task
*/
public synchronized List<TaskSubmissionResult> submitTasks(final List<TaskDeploymentDescriptor> tasks)
throws IOException {
public synchronized List<TaskSubmissionResult> submitTasks(final List<TaskDeploymentDescriptor> tasks) throws IOException {
return getTaskManagerProxy().submitTasks(tasks);
}
......@@ -232,25 +210,12 @@ public class Instance extends NetworkNode {
/**
* Updates the time of last received heart beat to the current system time.
*/
public synchronized void reportHeartBeat() {
public void reportHeartBeat() {
this.lastReceivedHeartBeat = System.currentTimeMillis();
}
/**
* Returns whether the host is still alive.
*
* @param cleanUpInterval
* duration (in milliseconds) after which a host is
* considered dead if it has no received heat-beats.
* @return <code>true</code> if the host has received a heat-beat before the <code>cleanUpInterval</code> duration
* has expired, <code>false</code> otherwise
*/
public synchronized boolean isStillAlive(final long cleanUpInterval) {
if (this.lastReceivedHeartBeat + cleanUpInterval < System.currentTimeMillis()) {
return false;
}
return true;
public boolean isStillAlive(long now, long cleanUpInterval) {
return this.lastReceivedHeartBeat + cleanUpInterval > now;
}
......@@ -369,4 +334,21 @@ public class Instance extends NetworkNode {
public long getLastHeartBeat() {
return this.lastReceivedHeartBeat;
}
public void markDied() {
}
public void destroy() {
}
public InstanceID getId() {
return instanceId;
}
public HardwareDescription getResources() {
return this.resources;
}
}
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.runtime.instance;
import java.io.IOException;
......@@ -27,13 +26,13 @@ import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.io.StringRecord;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.StringUtils;
/**
* This class encapsulates all connection information necessary to connect to the instance's task manager.
*
*/
public class InstanceConnectionInfo implements IOReadableWritable, Comparable<InstanceConnectionInfo> {
public class InstanceConnectionInfo implements IOReadableWritable, Comparable<InstanceConnectionInfo>, java.io.Serializable {
private static final long serialVersionUID = -8254407801276350716L;
/**
* The network address the instance's task manager binds its sockets to.
......@@ -55,14 +54,10 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
*/
private String hostName;
/**
* The domain name of the instance.
*/
private String domainName;
/**
* Constructs a new instance connection info object. The constructor will attempt to retrieve the instance's
* hostname and domain name through the operating system's lookup mechanisms.
* host name and domain name through the operating system's lookup mechanisms.
*
* @param inetAddress
* the network address the instance's task manager binds its sockets to
......@@ -85,73 +80,13 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
throw new IllegalArgumentException("Argument dataPort must be greater than zero");
}
this.inetAddress = inetAddress;
final String hostAddStr = inetAddress.getHostAddress();
final String fqdn = inetAddress.getCanonicalHostName();
if (hostAddStr.equals(fqdn)) {
this.hostName = fqdn;
this.domainName = null;
} else {
// Look for the first dot in the FQDN
final int firstDot = fqdn.indexOf('.');
if (firstDot == -1) {
this.hostName = fqdn;
this.domainName = null;
} else {
this.hostName = fqdn.substring(0, firstDot);
this.domainName = fqdn.substring(firstDot + 1);
}
}
this.ipcPort = ipcPort;
this.dataPort = dataPort;
}
/**
* Constructs a new instance connection info object.
*
* @param inetAddress
* the network address the instance's task manager binds its sockets to
* @param hostName
* the host name of the instance
* @param domainName
* the domain name of the instance
* @param ipcPort
* the port instance's task manager runs its IPC service on
* @param dataPort
* the port instance's task manager expects to receive transfer envelopes on.
*/
public InstanceConnectionInfo(final InetAddress inetAddress, final String hostName, final String domainName,
final int ipcPort, final int dataPort) {
if (inetAddress == null) {
throw new IllegalArgumentException("Argument inetAddress must not be null");
}
if (hostName == null) {
throw new IllegalArgumentException("Argument hostName must not be null");
}
if (ipcPort <= 0) {
throw new IllegalArgumentException("Argument ipcPort must be greater than zero");
}
if (dataPort <= 0) {
throw new IllegalArgumentException("Argument dataPort must be greater than zero");
}
this.inetAddress = inetAddress;
this.hostName = hostName;
this.domainName = domainName;
this.ipcPort = ipcPort;
this.dataPort = dataPort;
}
/**
* Constructs an empty {@link InstanceConnectionInfo} object.
* Constructs an empty object.
*/
public InstanceConnectionInfo() {}
......@@ -161,7 +96,6 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
* @return the port instance's task manager runs its IPC service on
*/
public int ipcPort() {
return this.ipcPort;
}
......@@ -171,7 +105,6 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
* @return the port instance's task manager expects to receive transfer envelopes on
*/
public int dataPort() {
return this.dataPort;
}
......@@ -181,7 +114,6 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
* @return the network address the instance's task manager binds its sockets to
*/
public InetAddress address() {
return this.inetAddress;
}
......@@ -192,110 +124,125 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
* @return the host name of the instance
*/
public String hostname() {
if (this.hostName == null) {
try {
this.hostName = this.inetAddress.getCanonicalHostName();
} catch (Throwable t) {
// could not determine host name, so take IP textual representation
this.hostName = inetAddress.getHostAddress();
}
}
return this.hostName;
}
/**
* Returns the domain name of the instance.
*
* @return the domain name of the instance or <code>null</code> if the domain name could not be determined
*/
public String domainName() {
return this.domainName;
}
public String getInetAdress() {
return this.inetAddress.toString();
}
// --------------------------------------------------------------------------------------------
// Serialization
// --------------------------------------------------------------------------------------------
@Override
public void read(final DataInputView in) throws IOException {
public void read(DataInputView in) throws IOException {
final int addr_length = in.readInt();
byte[] address = new byte[addr_length];
in.readFully(address);
this.hostName = StringRecord.readString(in);
this.domainName = StringRecord.readString(in);
this.ipcPort = in.readInt();
this.dataPort = in.readInt();
if (in.readBoolean()) {
this.hostName = StringRecord.readString(in);
}
try {
this.inetAddress = InetAddress.getByAddress(address);
} catch (UnknownHostException uhe) {
throw new IOException(StringUtils.stringifyException(uhe));
} catch (UnknownHostException e) {
throw new IOException("This lookup should never fail.", e);
}
this.ipcPort = in.readInt();
this.dataPort = in.readInt();
}
@Override
public void write(final DataOutputView out) throws IOException {
out.writeInt(this.inetAddress.getAddress().length);
out.write(this.inetAddress.getAddress());
StringRecord.writeString(out, this.hostName);
StringRecord.writeString(out, this.domainName);
out.writeInt(this.ipcPort);
out.writeInt(this.dataPort);
}
@Override
public String toString() {
String iaString;
String portsString = " (ipcPort="+ipcPort+", dataPort="+dataPort+")";
if (this.hostName != null) {
iaString = this.hostName+portsString;
out.writeBoolean(true);
StringRecord.writeString(out, this.hostName);
} else {
iaString = inetAddress.toString();
iaString = iaString.replace("/", "");
iaString += portsString;
out.writeBoolean(false);
}
return iaString;
}
// --------------------------------------------------------------------------------------------
// Serialization
// --------------------------------------------------------------------------------------------
@Override
public boolean equals(final Object obj) {
public String toString() {
return hostname() + " (ipcPort=" + ipcPort + ", dataPort=" + dataPort + ")";
}
@Override
public boolean equals(Object obj) {
if (obj instanceof InstanceConnectionInfo) {
InstanceConnectionInfo ici = (InstanceConnectionInfo) obj;
if (!this.inetAddress.equals(ici.address())) {
return false;
}
if (this.ipcPort != ici.ipcPort()) {
return false;
}
if (this.dataPort != ici.dataPort()) {
return false;
}
return true;
InstanceConnectionInfo other = (InstanceConnectionInfo) obj;
return this.ipcPort == other.ipcPort &&
this.dataPort == other.dataPort &&
this.inetAddress.equals(other.inetAddress);
} else {
return false;
}
return false;
}
@Override
public int hashCode() {
return this.inetAddress.hashCode();
return this.inetAddress.hashCode() +
17*ipcPort +
23*dataPort;
}
@Override
public int compareTo(final InstanceConnectionInfo o) {
return this.address().getHostName()
.compareTo(((InstanceConnectionInfo) o).address().getHostName());
public int compareTo(InstanceConnectionInfo o) {
// decide based on address first
byte[] thisAddress = this.inetAddress.getAddress();
byte[] otherAddress = o.inetAddress.getAddress();
if (thisAddress.length < otherAddress.length) {
return -1;
} else if (thisAddress.length > otherAddress.length) {
return 1;
} else {
for (int i = 0; i < thisAddress.length; i++) {
byte tb = thisAddress[i];
byte ob = otherAddress[i];
if (tb < ob) {
return -1;
} else if (tb > ob) {
return 1;
}
}
}
// addresses are identical, decide based on ports.
if (this.ipcPort < o.ipcPort) {
return -1;
} else if (this.ipcPort > o.ipcPort) {
return 1;
} else if (this.dataPort < o.dataPort) {
return -1;
} else if (this.dataPort > o.dataPort) {
return 1;
} else {
return 0;
}
}
}
......@@ -16,39 +16,24 @@
* limitations under the License.
*/
package org.apache.flink.runtime.instance;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.topology.NetworkTopology;
/**
* Simple manager that keeps track of which TaskManager are available and alive.
*/
public interface InstanceManager {
InstanceID registerTaskManager(InstanceConnectionInfo instanceConnectionInfo, HardwareDescription resources, int numberOfTaskSlots);
boolean reportHeartBeat(InstanceID instance);
void shutdown();
void releaseAllocatedResource(AllocatedResource allocatedResource) throws InstanceException;
void reportHeartBeat(InstanceConnectionInfo instanceConnectionInfo);
void registerTaskManager(InstanceConnectionInfo instanceConnectionInfo,
HardwareDescription hardwareDescription, int numberOfSlots);
void requestInstance(JobID jobID, Configuration conf, int requiredSlots)
throws InstanceException;
NetworkTopology getNetworkTopology(JobID jobID);
void setInstanceListener(InstanceListener instanceListener);
Instance getInstanceByName(String name);
int getNumberOfTaskManagers();
Map<InstanceID, Instance> getAllRegisteredInstances();
int getNumberOfSlots();
int getNumberOfRegisteredTaskManagers();
Map<InstanceConnectionInfo, Instance> getInstances();
int getTotalNumberOfSlots();
}
......@@ -16,13 +16,13 @@
* limitations under the License.
*/
package org.apache.flink.runtime.instance;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
......@@ -31,36 +31,55 @@ import org.apache.flink.runtime.taskmanager.TaskManager;
public class LocalInstanceManager extends DefaultInstanceManager {
private List<TaskManager> taskManagers = new ArrayList<TaskManager>();
public LocalInstanceManager() throws Exception{
int numTaskManager = GlobalConfiguration.getInteger(ConfigConstants
.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1);
private final List<TaskManager> taskManagers = new ArrayList<TaskManager>();
ExecutionMode execMode = numTaskManager == 1 ? ExecutionMode.LOCAL : ExecutionMode.CLUSTER;
public LocalInstanceManager(int numTaskManagers) throws Exception {
ExecutionMode execMode = numTaskManagers == 1 ? ExecutionMode.LOCAL : ExecutionMode.CLUSTER;
for (int i=0; i < numTaskManager; i++){
Configuration tm = new Configuration();
int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT);
int dataPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT);
tm.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ipcPort + i);
tm.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + i);
final int ipcPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, -1);
final int dataPort = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, -1);
for (int i = 0; i < numTaskManagers; i++) {
// configure ports, if necessary
if (ipcPort > 0 || dataPort > 0) {
Configuration tm = new Configuration();
if (ipcPort > 0) {
tm.setInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, ipcPort + i);
}
if (dataPort > 0) {
tm.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, dataPort + i);
}
GlobalConfiguration.includeConfiguration(tm);
GlobalConfiguration.includeConfiguration(tm);
}
taskManagers.add(new TaskManager(execMode));
}
}
@Override
public void shutdown(){
for(TaskManager taskManager: taskManagers){
taskManager.shutdown();
public void shutdown() {
try {
for (TaskManager taskManager: taskManagers){
try {
taskManager.shutdown();
}
catch (Throwable t) {
// log and continue in any case
// we initialize the log lazily, because this is the only place we log
// and most likely we never log.
LogFactory.getLog(LocalInstanceManager.class).error("Error shutting down local embedded TaskManager.", t);
}
}
} finally {
this.taskManagers.clear();
super.shutdown();
}
super.shutdown();
}
public TaskManager[] getTaskManagers() {
return (TaskManager[]) this.taskManagers.toArray(new TaskManager[this.taskManagers.size()]);
}
}
......@@ -55,7 +55,6 @@ import org.apache.flink.runtime.managementgraph.ManagementVertex;
import org.apache.flink.runtime.managementgraph.ManagementVertexID;
import org.apache.flink.runtime.profiling.ProfilingListener;
import org.apache.flink.runtime.profiling.types.ProfilingEvent;
import org.apache.flink.runtime.topology.NetworkTopology;
/**
* The event collector collects events which occurred during the execution of a job and prepares them
......@@ -311,11 +310,6 @@ public final class EventCollector extends TimerTask implements ProfilingListener
*/
private final Map<JobID, ManagementGraph> recentManagementGraphs = new HashMap<JobID, ManagementGraph>();
/**
* Map of network topologies belonging to recently started jobs with the time stamp of the last received job event.
*/
private final Map<JobID, NetworkTopology> recentNetworkTopologies = new HashMap<JobID, NetworkTopology>();
/**
* The timer used to trigger the cleanup routine.
*/
......@@ -545,10 +539,6 @@ public final class EventCollector extends TimerTask implements ProfilingListener
archiveManagementGraph(entry.getKey(), this.recentManagementGraphs.get(entry.getKey()));
this.recentManagementGraphs.remove(entry.getValue());
}
synchronized (this.recentNetworkTopologies) {
archiveNetworkTopology(entry.getKey(), this.recentNetworkTopologies.get(entry.getKey()));
this.recentNetworkTopologies.remove(entry.getValue());
}
}
}
}
......@@ -669,10 +659,4 @@ public final class EventCollector extends TimerTask implements ProfilingListener
al.archiveManagementGraph(jobId, graph);
}
}
private void archiveNetworkTopology(JobID jobId, NetworkTopology topology) {
for(ArchiveListener al : archivists) {
al.archiveNetworkTopology(jobId, topology);
}
}
}
......@@ -42,8 +42,8 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
......@@ -69,6 +69,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
import org.apache.flink.runtime.executiongraph.GraphConversionException;
import org.apache.flink.runtime.executiongraph.InternalJobStatus;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.instance.DefaultInstanceManager;
import org.apache.flink.runtime.instance.DummyInstance;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
......@@ -106,11 +107,14 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskKillResult;
import org.apache.flink.runtime.taskmanager.TaskSubmissionResult;
import org.apache.flink.runtime.taskmanager.transferenvelope.RegisterTaskManagerResult;
import org.apache.flink.runtime.topology.NetworkTopology;
import org.apache.flink.runtime.types.IntegerRecord;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.SerializableArrayList;
import org.apache.flink.util.StringUtils;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
/**
* In Nephele the job manager is the central component for communication with clients, creating
......@@ -124,7 +128,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
JobManagerProtocol, ChannelLookupProtocol, JobStatusListener, AccumulatorProtocol
{
private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
private static final Log LOG = LogFactory.getLog(JobManager.class);
private final Server jobManagerServer;
......@@ -214,11 +218,14 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
LOG.info("Starting job manager in " + executionMode + " mode");
// Try to load the instance manager for the given execution mode
final String instanceManagerClassName = JobManagerUtils.getInstanceManagerClassName(executionMode);
LOG.info("Trying to load " + instanceManagerClassName + " as instance manager");
this.instanceManager = JobManagerUtils.loadInstanceManager(instanceManagerClassName);
if (this.instanceManager == null) {
throw new Exception("Unable to load instance manager " + instanceManagerClassName);
if (executionMode == ExecutionMode.LOCAL) {
this.instanceManager = new Lo
}
else if (executionMode == ExecutionMode.CLUSTER) {
this.instanceManager = new DefaultInstanceManager();
}
else {
throw new IllegalArgumentException("ExecutionMode");
}
// Try to load the scheduler for the given execution mode
......@@ -246,7 +253,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
}
public void shutdown() {
LOG.debug("JobManager shutdown requested");
if (!this.isShutdownInProgress.compareAndSet(false, true)) {
return;
}
......@@ -272,7 +279,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
try {
this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOG.debug("Got interrupted while waiting for the executor service to shutdown.", e);
LOG.debug(e);
}
}
......@@ -285,14 +292,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
if (this.scheduler != null) {
this.scheduler.shutdown();
}
if(server != null) {
try {
server.stop();
} catch (Exception e) {
LOG.error("Error while shutting down the JobManager's webserver", e);
}
}
this.isShutDown = true;
LOG.debug("Shutdown of job manager completed");
......@@ -306,6 +305,16 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
*/
public static void main(String[] args) {
// determine if a valid log4j config exists and initialize a default logger if not
if (System.getProperty("log4j.configuration") == null) {
Logger root = Logger.getRootLogger();
root.removeAllAppenders();
PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
root.addAppender(appender);
root.setLevel(Level.INFO);
}
JobManager jobManager;
try {
jobManager = initialize(args);
......@@ -313,7 +322,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
jobManager.startInfoServer();
}
catch (Exception e) {
LOG.error(e.getMessage(), e);
LOG.fatal(e.getMessage(), e);
System.exit(FAILURE_RETURN_CODE);
}
......@@ -552,7 +561,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
LibraryCacheManager.unregister(executionGraph.getJobID());
} catch (IOException ioe) {
if (LOG.isWarnEnabled()) {
LOG.warn("IOException while unregistering the job with id " + executionGraph.getJobID() + ".",ioe);
LOG.warn(ioe);
}
}
}
......@@ -826,25 +835,11 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
return mg;
}
@Override
public NetworkTopology getNetworkTopology(final JobID jobID) throws IOException {
if (this.instanceManager != null) {
return this.instanceManager.getNetworkTopology(jobID);
}
return null;
}
@Override
public IntegerRecord getRecommendedPollingInterval() throws IOException {
return new IntegerRecord(this.recommendedClientPollingInterval);
}
@Override
public List<RecentJobEvent> getRecentJobs() throws IOException {
......@@ -926,7 +921,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
try {
instance.killTaskManager();
} catch (IOException ioe) {
LOG.error("IOException while killing the task manager on instance " + instanceName + ".", ioe);
LOG.error(ioe);
}
}
};
......@@ -1009,7 +1004,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
it2.next().logBufferUtilization();
}
} catch (IOException ioe) {
LOG.error("IOException while logging buffer utilization.", ioe);
LOG.error(ioe);
}
}
......@@ -1021,7 +1016,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
@Override
public int getAvailableSlots() {
return getInstanceManager().getNumberOfSlots();
return getInstanceManager().getTotalNumberOfSlots();
}
......@@ -1175,7 +1170,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
}
public int getNumberOfTaskManagers() {
return this.instanceManager.getNumberOfTaskManagers();
return this.instanceManager.getNumberOfRegisteredTaskManagers();
}
public Map<InstanceConnectionInfo, Instance> getInstances() {
......
......@@ -105,40 +105,6 @@ public class JobManagerUtils {
return scheduler;
}
/**
* Tries to locate a class with given name and to
* instantiate a instance manager from it.
*
* @param instanceManagerClassName
* the name of the class to instantiate the instance manager object from
* @return the {@link org.apache.flink.runtime.instance.InstanceManager} object instantiated from the class with the provided name
*/
@SuppressWarnings("unchecked")
static InstanceManager loadInstanceManager(final String instanceManagerClassName) {
Class<? extends InstanceManager> instanceManagerClass;
try {
instanceManagerClass = (Class<? extends InstanceManager>) Class.forName(instanceManagerClassName);
} catch (ClassNotFoundException e) {
LOG.error("Cannot find class " + instanceManagerClassName + ": " + StringUtils.stringifyException(e));
return null;
}
InstanceManager instanceManager;
try {
instanceManager = instanceManagerClass.newInstance();
} catch (InstantiationException e) {
LOG.error("Cannot create instanceManager: " + StringUtils.stringifyException(e));
return null;
} catch (IllegalAccessException e) {
LOG.error("Cannot create instanceManager: " + StringUtils.stringifyException(e));
return null;
}
return instanceManager;
}
/**
* Tries to read the class name of the {@link org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler} implementation from the global configuration which
* is set to be used for the provided execution mode.
......@@ -150,23 +116,4 @@ public class JobManagerUtils {
static String getSchedulerClassName(ExecutionMode executionMode) {
return "org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler";
}
/**
* Tries to read the class name of the {@link org.apache.flink.runtime.instance.InstanceManager} implementation from the global configuration which is
* set to be used for the provided execution mode.
*
* @param executionMode The Nephele execution mode.
* @return the class name of the {@link org.apache.flink.runtime.instance.InstanceManager} implementation to be used or <code>null</code> if no
* implementation is configured for the given execution mode
*/
static String getInstanceManagerClassName(ExecutionMode executionMode) {
switch (executionMode) {
case LOCAL:
return "org.apache.flink.runtime.instance.LocalInstanceManager";
case CLUSTER:
return "org.apache.flink.runtime.instance.DefaultInstanceManager";
default:
throw new RuntimeException("Unrecognized Execution Mode.");
}
}
}
......@@ -28,7 +28,6 @@ import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.managementgraph.ManagementGraph;
import org.apache.flink.runtime.managementgraph.ManagementVertexID;
import org.apache.flink.runtime.topology.NetworkTopology;
/**
* Interface used to implement Archivists, that store old jobmanager information discarded by the EventCollector.
......@@ -60,14 +59,6 @@ public interface ArchiveListener {
*/
void archiveManagementGraph(JobID jobId, ManagementGraph graph);
/**
* Stores old NetworkTopology in Archive
*
* @param jobId
* @param topology
*/
void archiveNetworkTopology(JobID jobId, NetworkTopology topology);
/**
* Get all archived Jobs
*
......
......@@ -34,7 +34,6 @@ import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.managementgraph.ManagementGraph;
import org.apache.flink.runtime.managementgraph.ManagementVertexID;
import org.apache.flink.runtime.topology.NetworkTopology;
/**
* Implementation of the ArchiveListener, that archives old data of the jobmanager in memory
......@@ -60,10 +59,6 @@ public class MemoryArchivist implements ArchiveListener {
*/
private final Map<JobID, ManagementGraph> managementGraphs = new HashMap<JobID, ManagementGraph>();
/**
* Map of network topologies belonging to recently started jobs with the time stamp of the last received job event.
*/
private final Map<JobID, NetworkTopology> networkTopologies = new HashMap<JobID, NetworkTopology>();
private final LinkedList<JobID> lru = new LinkedList<JobID>();
......@@ -96,13 +91,6 @@ public class MemoryArchivist implements ArchiveListener {
cleanup(jobId);
}
public void archiveNetworkTopology(JobID jobId, NetworkTopology topology) {
networkTopologies.put(jobId, topology);
cleanup(jobId);
}
public List<RecentJobEvent> getJobs() {
......@@ -118,7 +106,6 @@ public class MemoryArchivist implements ArchiveListener {
collectedEvents.remove(toRemove);
oldJobs.remove(toRemove);
managementGraphs.remove(toRemove);
networkTopologies.remove(toRemove);
}
}
......
......@@ -92,7 +92,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
writeJsonForArchivedJobGroupvertex(resp.getWriter(), jobmanager.getArchive().getJob(JobID.fromHexString(jobId)), ManagementGroupVertexID.fromHexString(groupvertexId));
}
else if("taskmanagers".equals(req.getParameter("get"))) {
resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +", \"slots\": "+jobmanager.getAvailableSlots()+"}");
resp.getWriter().write("{\"taskmanagers\": " + jobmanager.getNumberOfTaskManagers() +"}");
}
else if("cancel".equals(req.getParameter("get"))) {
String jobId = req.getParameter("job");
......
......@@ -125,9 +125,10 @@ public class SetupInfoServlet extends HttpServlet {
objInner.put("timeSinceLastHeartbeat", time / 1000);
objInner.put("slotsNumber", instance.getNumberOfSlots());
objInner.put("freeSlots", instance.getNumberOfAvailableSlots());
objInner.put("cpuCores", instance.getHardwareDescription().getNumberOfCPUCores());
objInner.put("physicalMemory", instance.getHardwareDescription().getSizeOfPhysicalMemory() / 1048576);
objInner.put("freeMemory", instance.getHardwareDescription().getSizeOfFreeMemory() / 1048576);
objInner.put("cpuCores", instance.getResources().getNumberOfCPUCores());
objInner.put("physicalMemory", instance.getResources().getSizeOfPhysicalMemory() >>> 20);
objInner.put("freeMemory", instance.getResources().getSizeOfJvmHeap() >>> 20);
objInner.put("managedMemory", instance.getResources().getSizeOfManagedMemory() >>> 20);
array.put(objInner);
} catch (JSONException e) {
LOG.warn("Json object creation failed", e);
......
......@@ -28,14 +28,12 @@ import org.apache.flink.runtime.event.job.RecentJobEvent;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.managementgraph.ManagementGraph;
import org.apache.flink.runtime.managementgraph.ManagementVertexID;
import org.apache.flink.runtime.topology.NetworkTopology;
/**
* This protocol provides extended management capabilities beyond the
* simple {@link JobManagementProtocol}. It can be used to retrieve
* internal scheduling information, the network topology, or profiling
* information about thread or instance utilization.
*
*/
public interface ExtendedManagementProtocol extends JobManagementProtocol {
......@@ -51,18 +49,6 @@ public interface ExtendedManagementProtocol extends JobManagementProtocol {
*/
ManagementGraph getManagementGraph(JobID jobID) throws IOException;
/**
* Retrieves the current network topology for the job with
* the given ID.
*
* @param jobID
* the ID identifying the job
* @return the network topology for the job
* @throws IOException
* thrown if an error occurs while retrieving the network topology
*/
NetworkTopology getNetworkTopology(JobID jobID) throws IOException;
/**
* Retrieves a list of jobs which have either running or have been started recently.
*
......
......@@ -74,7 +74,6 @@ import org.apache.flink.runtime.execution.librarycache.LibraryCacheUpdate;
import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.HardwareDescriptionFactory;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.ChannelManager;
......@@ -119,6 +118,15 @@ public class TaskManager implements TaskOperationProtocol {
public final static String ARG_CONF_DIR = "tempDir";
// --------------------------------------------------------------------------------------------
private final InstanceConnectionInfo localInstanceConnectionInfo;
private final HardwareDescription hardwareDescription;
private final ExecutionMode executionMode;
private final JobManagerProtocol jobManager;
private final InputSplitProviderProtocol globalInputSplitProvider;
......@@ -132,6 +140,7 @@ public class TaskManager implements TaskOperationProtocol {
private final Server taskManagerServer;
private final FileCache fileCache = new FileCache();
/**
* This map contains all the tasks whose threads are in a state other than TERMINATED. If any task
* is stored inside this map and its thread status is TERMINATED, this indicates a virtual machine error.
......@@ -139,8 +148,6 @@ public class TaskManager implements TaskOperationProtocol {
*/
private final Map<ExecutionVertexID, Task> runningTasks = new ConcurrentHashMap<ExecutionVertexID, Task>();
private final InstanceConnectionInfo localInstanceConnectionInfo;
/**
* The instance of the {@link ChannelManager} which is responsible for
* setting up and cleaning up the byte buffered channels of the tasks.
......@@ -156,8 +163,6 @@ public class TaskManager implements TaskOperationProtocol {
private final IOManager ioManager;
private final HardwareDescription hardwareDescription;
private final int numberOfSlots;
private final Thread heartbeatThread;
......@@ -175,8 +180,9 @@ public class TaskManager implements TaskOperationProtocol {
if (executionMode == null) {
throw new NullPointerException("Execution mode must not be null.");
}
LOG.info("Execution mode: " + executionMode);
this.executionMode = executionMode;
// IMPORTANT! At this point, the GlobalConfiguration must have been read!
......@@ -352,8 +358,6 @@ public class TaskManager implements TaskOperationProtocol {
this.numberOfSlots = slots;
}
this.hardwareDescription = HardwareDescriptionFactory.extractFromSystem();
// initialize the memory manager
{
// Check whether the memory size has been explicitly configured.
......@@ -363,7 +367,7 @@ public class TaskManager implements TaskOperationProtocol {
if (configuredMemorySize == -1) {
// no manually configured memory. take a relative fraction of the free heap space
float fraction = GlobalConfiguration.getFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, ConfigConstants.DEFAULT_MEMORY_MANAGER_MEMORY_FRACTION);
memorySize = (long) (this.hardwareDescription.getSizeOfFreeMemory() * fraction);
memorySize = (long) (EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag() * fraction);
LOG.info("Using " + fraction + " of the free heap space for managed memory.");
}
else if (configuredMemorySize <= 0) {
......@@ -391,6 +395,8 @@ public class TaskManager implements TaskOperationProtocol {
throw new Exception("Unable to initialize memory manager.", t);
}
}
this.hardwareDescription = HardwareDescription.extractFromSystem(this.memoryManager.getMemorySize());
this.ioManager = new IOManager(tmpDirPaths);
......@@ -1117,6 +1123,22 @@ public class TaskManager implements TaskOperationProtocol {
}
}
}
// --------------------------------------------------------------------------------------------
// Properties
// --------------------------------------------------------------------------------------------
public InstanceConnectionInfo getConnectionInfo() {
return this.localInstanceConnectionInfo;
}
public ExecutionMode getExecutionMode() {
return this.executionMode;
}
// --------------------------------------------------------------------------------------------
// Memory and Garbace Collection Debugging Utilities
// --------------------------------------------------------------------------------------------
private String getMemoryUsageStatsAsString(MemoryMXBean memoryMXBean) {
MemoryUsage heap = memoryMXBean.getHeapMemoryUsage();
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.topology;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.io.StringRecord;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
public class NetworkNode implements IOReadableWritable {
private final NetworkTopology networkTopology;
private String name = null;
private final NetworkNode parentNode;
private final List<NetworkNode> childNodes = new ArrayList<NetworkNode>();
protected NetworkNode(final String name, final NetworkNode parentNode, final NetworkTopology networkTopology) {
this.name = name;
this.parentNode = parentNode;
this.networkTopology = networkTopology;
if (this.parentNode != null) {
this.parentNode.addChild(this);
}
if (this.networkTopology != null) {
this.networkTopology.addNode(this);
}
}
NetworkNode(final NetworkNode parentNode, final NetworkTopology networkTopology) {
this.parentNode = parentNode;
this.networkTopology = networkTopology;
// The node will add itself when it is fully deserialized
}
private void addChild(final NetworkNode child) {
this.childNodes.add(child);
}
public void remove() {
if (!isLeafNode()) {
return;
}
if (this.parentNode != null) {
this.parentNode.removeChild(this);
}
if (this.networkTopology != null) {
this.networkTopology.removeNode(this);
}
}
private void removeChild(final NetworkNode child) {
this.childNodes.remove(child);
}
public boolean isRootNode() {
return (this.parentNode == null);
}
public boolean isLeafNode() {
return this.childNodes.isEmpty();
}
public String getName() {
return this.name;
}
public int getDepth() {
if (this.isRootNode()) {
return 1;
}
return (1 + this.parentNode.getDepth());
}
public int getHeight() {
int maxHeight = 0;
final Iterator<NetworkNode> it = this.childNodes.iterator();
while (it.hasNext()) {
final int height = it.next().getHeight();
if (height > maxHeight) {
maxHeight = height;
}
}
return (1 + maxHeight);
}
public int getNumberOfChildNodes() {
return this.childNodes.size();
}
public NetworkNode getChildNode(final int index) {
if (index < this.childNodes.size()) {
return this.childNodes.get(index);
}
return null;
}
public NetworkNode getParentNode() {
return this.parentNode;
}
/**
* Returns the network topology that is associated with this network node.
*
* @return the network topology that is associated with this network node
*/
public NetworkTopology getNetworkTopology() {
return this.networkTopology;
}
/**
* Determines the distance to the given network node. The distance is determined as the number of internal network
* nodes that must be traversed in order to send a packet from one node to the other plus one.
*
* @param networkNode
* the node to determine the distance for
* @return the distance to the given network node or <code>Integer.MAX_VALUE</code> if the given node is not part of
* this node's network topology
*/
public int getDistance(final NetworkNode networkNode) {
int steps = 0;
NetworkNode tmp = this;
while (tmp != null) {
final int distance = tmp.isPredecessorOrSelfOf(networkNode);
if (distance >= 0) {
return (steps + distance);
}
tmp = tmp.getParentNode();
++steps;
}
return Integer.MAX_VALUE;
}
/**
* Checks whether this node is a predecessor or the identity (the node itself) of the given network node in the
* network topology tree.
*
* @param networkNode
* a potential child network node
* @return If this node node is a predecessor of given node in the network topology tree, the return value
* indicates the distance between both nodes. If the given node equals this node, the
* return value is <code>0</code>. Otherwise the return value is <code>-1</code>.
*/
private int isPredecessorOrSelfOf(final NetworkNode networkNode) {
NetworkNode tmp = networkNode;
int steps = 0;
while (tmp != null) {
if (this.equals(tmp)) {
return steps;
}
tmp = tmp.getParentNode();
++steps;
}
return -1;
}
public int getDistance(final String nodeName) {
final NetworkNode networkNode = this.networkTopology.getNodeByName(nodeName);
if (networkNode == null) {
return Integer.MAX_VALUE;
}
if (this.equals(networkNode)) {
return 0;
}
return getDistance(networkNode);
}
@Override
public void read(final DataInputView in) throws IOException {
this.name = StringRecord.readString(in);
// We need to read the name before we can add the node to the topology's node map
if (this.networkTopology != null) {
this.networkTopology.addNode(this);
}
final int numberOfChildNodes = in.readInt();
for (int i = 0; i < numberOfChildNodes; i++) {
final NetworkNode networkNode = new NetworkNode(this, this.networkTopology);
networkNode.read(in);
this.childNodes.add(networkNode);
}
}
@Override
public void write(final DataOutputView out) throws IOException {
StringRecord.writeString(out, this.name);
out.writeInt(this.childNodes.size());
final Iterator<NetworkNode> it = this.childNodes.iterator();
while (it.hasNext()) {
it.next().write(out);
}
}
@Override
public String toString() {
String str;
if (this.childNodes.isEmpty()) {
str = this.name;
} else {
final Iterator<NetworkNode> it = this.childNodes.iterator();
final StringBuffer buf = new StringBuffer("[");
while (it.hasNext()) {
buf.append(it.next().toString());
if (it.hasNext()) {
buf.append(", ");
}
}
buf.append("]");
str = buf.toString();
}
return str;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.topology;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
public class NetworkTopology implements IOReadableWritable {
private static final String DEFAULT_ROOT_NODE_NAME = "root";
private static final String SEPARATOR = "/";
private final NetworkNode rootNode;
private final Map<String, NetworkNode> nodeMap = new HashMap<String, NetworkNode>();
private Object attachment = null;
public NetworkTopology() {
this.rootNode = new NetworkNode(DEFAULT_ROOT_NODE_NAME, null, this);
}
public static NetworkTopology fromFile(final File topologyFile) throws IOException {
// First create root node and topology object
final NetworkTopology topology = new NetworkTopology();
// Now read the topology file
final FileInputStream fstream = new FileInputStream(topologyFile);
final DataInputStream in = new DataInputStream(fstream);
final BufferedReader br = new BufferedReader(new InputStreamReader(in));
String strLine;
final Pattern pattern = Pattern.compile("^(\\S+)\\s*(\\S*)\\s*$");
try {
while ((strLine = br.readLine()) != null && !strLine.isEmpty()) {
final Matcher m = pattern.matcher(strLine);
if (!m.matches()) {
throw new IOException("Cannot extract topology information from line \"" + strLine + "\"");
}
strLine = m.group(1);
// Remove leading SEPARATOR
if (strLine.charAt(0) == SEPARATOR.charAt(0)) {
strLine = strLine.substring(1);
}
final String[] splits = strLine.split(SEPARATOR);
NetworkNode previousNode = topology.getRootNode();
for (int i = 0; i < splits.length; i++) {
NetworkNode networkNode = topology.getNodeByName(splits[i]);
if (networkNode == null) {
networkNode = new NetworkNode(splits[i], previousNode, topology);
}
previousNode = networkNode;
}
}
} finally {
// Close reader
br.close();
}
return topology;
}
public static NetworkTopology fromFile(final String topologyFileName) throws IOException {
return fromFile(new File(topologyFileName));
}
public static NetworkTopology createEmptyTopology() {
return new NetworkTopology();
}
void addNode(final NetworkNode networkNode) {
this.nodeMap.put(networkNode.getName(), networkNode);
}
public NetworkNode getNodeByName(final String name) {
return this.nodeMap.get(name);
}
public NetworkNode getRootNode() {
return this.rootNode;
}
@Override
public void read(final DataInputView in) throws IOException {
this.rootNode.read(in);
}
@Override
public void write(final DataOutputView out) throws IOException {
this.rootNode.write(out);
}
public int getDepth() {
return this.rootNode.getHeight();
}
@Override
public String toString() {
return this.rootNode.toString();
}
void removeNode(NetworkNode networkNode) {
this.nodeMap.remove(networkNode.getName());
}
public Iterator<NetworkNode> iterator() {
return new NetworkTopologyIterator(this);
}
public void setAttachment(Object attachment) {
this.attachment = attachment;
}
public Object getAttachment() {
return this.attachment;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.topology;
import java.util.Iterator;
import java.util.Stack;
public class NetworkTopologyIterator implements Iterator<NetworkNode> {
private static class TraversalEntry {
private NetworkNode networkNode;
private int childrenVisited = 0;
public TraversalEntry(final NetworkNode networkNode) {
this.networkNode = networkNode;
}
public NetworkNode getNetworkNode() {
return this.networkNode;
}
public int getChildrenVisited() {
return this.childrenVisited;
}
public void increaseChildrenVisited() {
++this.childrenVisited;
}
}
private Stack<TraversalEntry> traversalStack = new Stack<TraversalEntry>();
NetworkTopologyIterator(final NetworkTopology networkTopology) {
traversalStack.add(new TraversalEntry(networkTopology.getRootNode()));
refillStack();
}
private void refillStack() {
while (true) {
final TraversalEntry traversalEntry = this.traversalStack.peek();
final NetworkNode networkNode = traversalEntry.getNetworkNode();
if (networkNode.isLeafNode()) {
break;
}
final NetworkNode childNode = networkNode.getChildNode(traversalEntry.getChildrenVisited());
this.traversalStack.add(new TraversalEntry(childNode));
}
}
@Override
public boolean hasNext() {
if (this.traversalStack.isEmpty()) {
return false;
}
return true;
}
@Override
public NetworkNode next() {
final TraversalEntry traversalEntry = this.traversalStack.pop();
final NetworkNode networkNode = traversalEntry.networkNode;
if (!this.traversalStack.isEmpty()) {
final TraversalEntry parentTraversalEntry = this.traversalStack.peek();
parentTraversalEntry.increaseChildrenVisited();
if (parentTraversalEntry.getChildrenVisited() < parentTraversalEntry.getNetworkNode()
.getNumberOfChildNodes()) {
refillStack();
}
}
return networkNode;
}
@Override
public void remove() {
// Optional operation
}
}
......@@ -36,12 +36,13 @@ public class EnvironmentInformation {
/**
* Returns the version of the code as String. If version == null, then the JobManager does not run from a
* maven build. An example is a source code checkout, compile, and run from inside an IDE.
* Maven build. An example is a source code checkout, compile, and run from inside an IDE.
*
* @return The version string.
*/
public static String getVersion() {
return EnvironmentInformation.class.getPackage().getImplementationVersion();
String version = EnvironmentInformation.class.getPackage().getImplementationVersion();
return version != null ? version : UNKNOWN;
}
/**
......@@ -78,6 +79,11 @@ public class EnvironmentInformation {
public String commitDate;
}
/**
* Gets the name of the user that is running the JVM.
*
* @return The name of the user that is running the JVM.
*/
public static String getUserRunning() {
try {
return UserGroupInformation.getCurrentUser().getShortUserName();
......@@ -98,10 +104,44 @@ public class EnvironmentInformation {
return user;
}
public static long getMaxJvmMemory() {
return Runtime.getRuntime().maxMemory() >>> 20;
/**
* The maximum JVM heap size, in bytes.
*
* @return The maximum JVM heap size, in bytes.
*/
public static long getMaxJvmHeapMemory() {
return Runtime.getRuntime().maxMemory();
}
/**
* Gets an estimate of the size of the free heap memory.
*
* NOTE: This method is heavy-weight. It triggers a garbage collection to reduce fragmentation and get
* a better estimate at the size of free memory. It is typically more accurate than the plain version
* {@link #getSizeOfFreeHeapMemory()}.
*
* @return An estimate of the size of the free heap memory, in bytes.
*/
public static long getSizeOfFreeHeapMemoryWithDefrag() {
// trigger a garbage collection, to reduce fragmentation
System.gc();
return getSizeOfFreeHeapMemory();
}
/**
* Gets an estimate of the size of the free heap memory. The estimate may vary, depending on the current
* level of memory fragmentation and the number of dead objects. For a better (but more heavy-weight)
* estimate, use {@link #getSizeOfFreeHeapMemoryWithDefrag()}.
*
* @return An estimate of the size of the free heap memory, in bytes.
*/
public static long getSizeOfFreeHeapMemory() {
Runtime r = Runtime.getRuntime();
return r.maxMemory() - r.totalMemory() + r.freeMemory();
}
public static String getJvmVersion() {
try {
final RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
......@@ -135,7 +175,7 @@ public class EnvironmentInformation {
String javaHome = System.getenv("JAVA_HOME");
long memory = getMaxJvmMemory();
long maxHeapMegabytes = getMaxJvmHeapMemory() >>> 20;
log.info("-------------------------------------------------------");
log.info(" Starting " + componentName + " (Version: " + version + ", "
......@@ -143,7 +183,7 @@ public class EnvironmentInformation {
log.info(" Current user: " + user);
log.info(" JVM: " + jvmVersion);
log.info(" Startup Options: " + options);
log.info(" Maximum heap size: " + memory + " MiBytes");
log.info(" Maximum heap size: " + maxHeapMegabytes + " MiBytes");
log.info(" JAVA_HOME: " + (javaHome == null ? "not set" : javaHome));
log.info("-------------------------------------------------------");
}
......
......@@ -54,7 +54,7 @@ public class AbstractIDTest {
final ChannelID origID = new ChannelID();
try {
final ChannelID copyID = (ChannelID) CommonTestUtils.createCopy(origID);
final ChannelID copyID = (ChannelID) CommonTestUtils.createCopyWritable(origID);
assertEquals(origID.hashCode(), copyID.hashCode());
assertEquals(origID, copyID);
......
......@@ -55,7 +55,7 @@ public class JobResultTest {
try {
final JobCancelResult copy = (JobCancelResult) CommonTestUtils.createCopy(orig);
final JobCancelResult copy = (JobCancelResult) CommonTestUtils.createCopyWritable(orig);
assertEquals(orig.getReturnCode(), copy.getReturnCode());
assertEquals(orig.getDescription(), copy.getDescription());
assertEquals(orig.hashCode(), copy.hashCode());
......@@ -78,7 +78,7 @@ public class JobResultTest {
try {
final JobProgressResult copy = (JobProgressResult) CommonTestUtils.createCopy(orig);
final JobProgressResult copy = (JobProgressResult) CommonTestUtils.createCopyWritable(orig);
assertEquals(orig.getReturnCode(), copy.getReturnCode());
assertEquals(orig.getDescription(), copy.getDescription());
assertEquals(orig.hashCode(), copy.hashCode());
......@@ -99,7 +99,7 @@ public class JobResultTest {
try {
final JobSubmissionResult copy = (JobSubmissionResult) CommonTestUtils.createCopy(orig);
final JobSubmissionResult copy = (JobSubmissionResult) CommonTestUtils.createCopyWritable(orig);
assertEquals(orig.getReturnCode(), copy.getReturnCode());
assertEquals(orig.getDescription(), copy.getDescription());
assertEquals(orig.hashCode(), copy.hashCode());
......
......@@ -46,7 +46,7 @@ public class JobEventTest {
try {
final JobEvent orig = new JobEvent(1234567L, JobStatus.FINISHED, null);
final JobEvent copy = (JobEvent) CommonTestUtils.createCopy(orig);
final JobEvent copy = (JobEvent) CommonTestUtils.createCopyWritable(orig);
assertEquals(orig.getTimestamp(), copy.getTimestamp());
assertEquals(orig.getCurrentJobStatus(), copy.getCurrentJobStatus());
......@@ -68,7 +68,7 @@ public class JobEventTest {
final VertexEvent orig = new VertexEvent(23423423L, new JobVertexID(), "Test Vertex", 2, 0,
ExecutionState.READY, "Test Description");
final VertexEvent copy = (VertexEvent) CommonTestUtils.createCopy(orig);
final VertexEvent copy = (VertexEvent) CommonTestUtils.createCopyWritable(orig);
assertEquals(orig.getTimestamp(), copy.getTimestamp());
assertEquals(orig.getJobVertexID(), copy.getJobVertexID());
......
......@@ -44,7 +44,7 @@ public class TaskEventTest {
try {
final IntegerTaskEvent orig = new IntegerTaskEvent(11);
final IntegerTaskEvent copy = (IntegerTaskEvent) CommonTestUtils.createCopy(orig);
final IntegerTaskEvent copy = (IntegerTaskEvent) CommonTestUtils.createCopyWritable(orig);
assertEquals(orig.getInteger(), copy.getInteger());
assertEquals(orig.hashCode(), copy.hashCode());
......@@ -64,7 +64,7 @@ public class TaskEventTest {
try {
final StringTaskEvent orig = new StringTaskEvent("Test");
final StringTaskEvent copy = (StringTaskEvent) CommonTestUtils.createCopy(orig);
final StringTaskEvent copy = (StringTaskEvent) CommonTestUtils.createCopyWritable(orig);
assertEquals(orig.getString(), copy.getString());
assertEquals(orig.hashCode(), copy.hashCode());
......@@ -86,7 +86,7 @@ public class TaskEventTest {
final EventList orig = new EventList();
orig.add(new StringTaskEvent("Test 2"));
orig.add(new IntegerTaskEvent(70));
final EventList copy = (EventList) CommonTestUtils.createCopy(orig);
final EventList copy = (EventList) CommonTestUtils.createCopyWritable(orig);
assertEquals(orig.size(), copy.size());
final Iterator<AbstractEvent> origIt = orig.iterator();
......
......@@ -16,22 +16,22 @@
* limitations under the License.
*/
package org.apache.flink.runtime.instance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.net.InetAddress;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.LogUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
......@@ -40,34 +40,34 @@ import org.junit.Test;
* Tests for {@link org.apache.flink.runtime.instance.DefaultInstanceManager}.
*/
public class DefaultInstanceManagerTest {
@Test
public void testInstanceRegistering() {
try {
DefaultInstanceManager cm = new DefaultInstanceManager();
TestInstanceListener testInstanceListener = new TestInstanceListener();
cm.setInstanceListener(testInstanceListener);
int ipcPort = ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT;
int dataPort = ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT;
final int ipcPort = 10000;
final int dataPort = 20000;
HardwareDescription hardwareDescription = HardwareDescriptionFactory.construct(2, 2L * 1024L * 1024L * 1024L,
2L * 1024L * 1024L * 1024L);
HardwareDescription hardwareDescription = HardwareDescription.extractFromSystem(4096);
String hostname = "192.168.198.1";
InetAddress address = InetAddress.getByName("192.168.198.1");
InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 0, dataPort + 0);
InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 15, dataPort + 15);
InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 30, dataPort + 30);
InetAddress address = InetAddress.getByName("127.0.0.1");
// register three instances
cm.registerTaskManager(ici1, hardwareDescription, 1);
cm.registerTaskManager(ici2, hardwareDescription, 1);
cm.registerTaskManager(ici3, hardwareDescription, 1);
InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, ipcPort + 0, dataPort + 0);
InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, ipcPort + 15, dataPort + 15);
InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, ipcPort + 30, dataPort + 30);
assertEquals(3, cm.getNumberOfSlots());
InstanceID i1 = cm.registerTaskManager(ici1, hardwareDescription, 1);
InstanceID i2 = cm.registerTaskManager(ici2, hardwareDescription, 2);
InstanceID i3 = cm.registerTaskManager(ici3, hardwareDescription, 5);
assertEquals(3, cm.getNumberOfRegisteredTaskManagers());
assertEquals(8, cm.getTotalNumberOfSlots());
assertEquals(ici1, cm.getAllRegisteredInstances().get(i1).getInstanceConnectionInfo());
assertEquals(ici2, cm.getAllRegisteredInstances().get(i2).getInstanceConnectionInfo());
assertEquals(ici3, cm.getAllRegisteredInstances().get(i3).getInstanceConnectionInfo());
cm.shutdown();
}
......@@ -77,77 +77,86 @@ public class DefaultInstanceManagerTest {
Assert.fail("Test erroneous: " + e.getMessage());
}
}
@Test
public void testAllocationDeallocation() {
public void testRegisteringAlreadyRegistered() {
try {
DefaultInstanceManager cm = new DefaultInstanceManager();
TestInstanceListener testInstanceListener = new TestInstanceListener();
cm.setInstanceListener(testInstanceListener);
int ipcPort = ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT;
int dataPort = ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT;
final int ipcPort = 10000;
final int dataPort = 20000;
HardwareDescription hardwareDescription = HardwareDescriptionFactory.construct(2, 2L * 1024L * 1024L * 1024L,
2L * 1024L * 1024L * 1024L);
HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
InetAddress address = InetAddress.getByName("127.0.0.1");
InstanceConnectionInfo ici = new InstanceConnectionInfo(address, ipcPort + 0, dataPort + 0);
InstanceID i = cm.registerTaskManager(ici, resources, 1);
String hostname = "192.168.198.1";
InetAddress address = InetAddress.getByName("192.168.198.1");
assertNotNull(i);
assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
assertEquals(1, cm.getTotalNumberOfSlots());
InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 0, dataPort + 0);
InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 15, dataPort + 15);
InstanceID next = cm.registerTaskManager(ici, resources, 1);
assertNull(next);
// register three instances
cm.registerTaskManager(ici1, hardwareDescription, 1);
cm.registerTaskManager(ici2, hardwareDescription, 1);
assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
assertEquals(1, cm.getTotalNumberOfSlots());
assertEquals(2, cm.getNumberOfSlots());
cm.shutdown();
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Test erroneous: " + e.getMessage());
}
}
@Test
public void testReportHeartbeat() {
try {
DefaultInstanceManager cm = new DefaultInstanceManager();
final int ipcPort = 10000;
final int dataPort = 20000;
HardwareDescription hardwareDescription = HardwareDescription.extractFromSystem(4096);
InetAddress address = InetAddress.getByName("127.0.0.1");
// allocate something
JobID jobID = new JobID();
Configuration conf = new Configuration();
cm.requestInstance(jobID, conf, 2);
// register three instances
InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, ipcPort + 0, dataPort + 0);
InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, ipcPort + 1, dataPort + 1);
InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, ipcPort + 2, dataPort + 2);
DefaultInstanceManagerTestUtils.waitForInstances(jobID, testInstanceListener, 3, 1000);
InstanceID i1 = cm.registerTaskManager(ici1, hardwareDescription, 1);
InstanceID i2 = cm.registerTaskManager(ici2, hardwareDescription, 1);
InstanceID i3 = cm.registerTaskManager(ici3, hardwareDescription, 1);
// report some immediate heart beats
assertTrue(cm.reportHeartBeat(i1));
assertTrue(cm.reportHeartBeat(i2));
assertTrue(cm.reportHeartBeat(i3));
List<AllocatedResource> allocatedResources = testInstanceListener.getAllocatedResourcesForJob(jobID);
assertEquals(2, allocatedResources.size());
// report heart beat for non-existing instance
assertFalse(cm.reportHeartBeat(new InstanceID()));
Iterator<AllocatedResource> it = allocatedResources.iterator();
Set<AllocationID> allocationIDs = new HashSet<AllocationID>();
while (it.hasNext()) {
AllocatedResource allocatedResource = it.next();
if (allocationIDs.contains(allocatedResource.getAllocationID())) {
fail("Discovered allocation ID " + allocatedResource.getAllocationID() + " at least twice");
} else {
allocationIDs.add(allocatedResource.getAllocationID());
}
}
// Try to allocate more resources which must result in an error
try {
cm.requestInstance(jobID, conf, 3);
fail("ClusterManager allowed to request more instances than actually available");
} catch (InstanceException ie) {
// Exception is expected and correct behavior here
}
// Release all allocated resources
it = allocatedResources.iterator();
while (it.hasNext()) {
final AllocatedResource allocatedResource = it.next();
cm.releaseAllocatedResource(allocatedResource);
}
final long WAIT = 200;
CommonTestUtils.sleepUninterruptibly(WAIT);
// Now further allocations should be possible
long h1 = cm.getAllRegisteredInstances().get(i1).getLastHeartBeat();
long h2 = cm.getAllRegisteredInstances().get(i2).getLastHeartBeat();
long h3 = cm.getAllRegisteredInstances().get(i3).getLastHeartBeat();
// send one heart beat again and verify that the
assertTrue(cm.reportHeartBeat(i1));
long newH1 = cm.getAllRegisteredInstances().get(i1).getLastHeartBeat();
cm.requestInstance(jobID, conf, 1);
long now = System.currentTimeMillis();
assertTrue(now - h1 >= WAIT);
assertTrue(now - h2 >= WAIT);
assertTrue(now - h3 >= WAIT);
assertTrue(now - newH1 <= WAIT);
cm.shutdown();
}
......@@ -157,6 +166,39 @@ public class DefaultInstanceManagerTest {
Assert.fail("Test erroneous: " + e.getMessage());
}
}
@Test
public void testShutdown() {
try {
DefaultInstanceManager cm = new DefaultInstanceManager();
cm.shutdown();
try {
HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
InetAddress address = InetAddress.getByName("127.0.0.1");
InstanceConnectionInfo ici = new InstanceConnectionInfo(address, 10000, 20000);
cm.registerTaskManager(ici, resources, 1);
fail("Should raise exception in shutdown state");
}
catch (IllegalStateException e) {
// expected
}
try {
cm.reportHeartBeat(new InstanceID());
fail("Should raise exception in shutdown state");
}
catch (IllegalStateException e) {
// expected
}
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Test erroneous: " + e.getMessage());
}
}
/**
* This test checks the clean-up routines of the cluster manager.
......@@ -164,58 +206,67 @@ public class DefaultInstanceManagerTest {
@Test
public void testCleanUp() {
try {
final int CLEANUP_INTERVAL = 2;
// configure a short cleanup interval
Configuration config = new Configuration();
config.setInteger("instancemanager.cluster.cleanupinterval", CLEANUP_INTERVAL);
GlobalConfiguration.includeConfiguration(config);
DefaultInstanceManager cm = new DefaultInstanceManager();
TestInstanceListener testInstanceListener = new TestInstanceListener();
cm.setInstanceListener(testInstanceListener);
int ipcPort = ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT;
int dataPort = ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT;
DefaultInstanceManager cm = new DefaultInstanceManager(200, 100);
HardwareDescription hardwareDescription = HardwareDescriptionFactory.construct(2, 2L * 1024L * 1024L * 1024L,
2L * 1024L * 1024L * 1024L);
String hostname = "192.168.198.1";
InetAddress address = InetAddress.getByName("192.168.198.1");
InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 0, dataPort + 0);
InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 15, dataPort + 15);
InstanceConnectionInfo ici3 = new InstanceConnectionInfo(address, hostname, null, ipcPort + 30, dataPort + 30);
HardwareDescription resources = HardwareDescription.extractFromSystem(4096);
InetAddress address = InetAddress.getByName("127.0.0.1");
InstanceConnectionInfo ici1 = new InstanceConnectionInfo(address, 10000, 20000);
InstanceConnectionInfo ici2 = new InstanceConnectionInfo(address, 10001, 20001);
// register three instances
cm.registerTaskManager(ici1, hardwareDescription, 1);
cm.registerTaskManager(ici2, hardwareDescription, 1);
cm.registerTaskManager(ici3, hardwareDescription, 1);
assertEquals(3, cm.getNumberOfSlots());
// request some instances
JobID jobID = new JobID();
Configuration conf = new Configuration();
cm.requestInstance(jobID, conf, 1);
InstanceID i1 = cm.registerTaskManager(ici1, resources, 1);
InstanceID i2 = cm.registerTaskManager(ici2, resources, 1);
DefaultInstanceManagerTestUtils.waitForInstances(jobID, testInstanceListener, 1, 1000);
assertEquals(1, testInstanceListener.getNumberOfAllocatedResourcesForJob(jobID));
// wait for the cleanup to kick in
Thread.sleep(2000 * CLEANUP_INTERVAL);
// check that the instances are gone
DefaultInstanceManagerTestUtils.waitForInstances(jobID, testInstanceListener, 0, 1000);
assertEquals(0, testInstanceListener.getNumberOfAllocatedResourcesForJob(jobID));
assertEquals(0, cm.getNumberOfSlots());
assertNotNull(i1);
assertNotNull(i2);
assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
assertEquals(2, cm.getTotalNumberOfSlots());
// report a few heatbeats for both of the machines (each 50 msecs)...
for (int i = 0; i < 8; i++) {
CommonTestUtils.sleepUninterruptibly(50);
assertTrue(cm.reportHeartBeat(i1));
assertTrue(cm.reportHeartBeat(i2));
}
// all should be alive
assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
assertEquals(2, cm.getTotalNumberOfSlots());
// report a few heatbeats for both only one machine
for (int i = 0; i < 8; i++) {
CommonTestUtils.sleepUninterruptibly(50);
assertTrue(cm.reportHeartBeat(i1));
}
// we should have lost one TM by now
assertEquals(1, cm.getNumberOfRegisteredTaskManagers());
assertEquals(1, cm.getTotalNumberOfSlots());
// if the lost TM reports, it should not be accepted
assertFalse(cm.reportHeartBeat(i2));
// allow the lost TM to re-register itself
i2 = cm.registerTaskManager(ici2, resources, 1);
assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
assertEquals(2, cm.getTotalNumberOfSlots());
// report a few heatbeats for both of the machines (each 50 msecs)...
for (int i = 0; i < 8; i++) {
CommonTestUtils.sleepUninterruptibly(50);
assertTrue(cm.reportHeartBeat(i1));
assertTrue(cm.reportHeartBeat(i2));
}
// all should be alive
assertEquals(2, cm.getNumberOfRegisteredTaskManagers());
assertEquals(2, cm.getTotalNumberOfSlots());
cm.shutdown();
}
catch (Exception e) {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.instance;
import org.apache.flink.runtime.instance.InstanceListener;
import org.apache.flink.runtime.jobgraph.JobID;
/**
* This class contains utility methods used during the tests of the {@link org.apache.flink.runtime.instance.DefaultInstanceManager} implementation.
*
*/
public class DefaultInstanceManagerTestUtils {
/**
* Granularity of the sleep time.
*/
private static final long SLEEP_TIME = 10; // 10 milliseconds
/**
* Private constructor so the class cannot be instantiated.
*/
private DefaultInstanceManagerTestUtils() {
}
/**
* Waits until a specific number of instances have registered or deregistrations with the given
* {@link InstanceListener} object for a given job or the maximum wait time has elapsed.
*
* @param jobID
* the ID of the job to check the instance registration for
* @param instanceListener
* the listener which shall be notified when a requested instance is available for the job
* @param numberOfInstances
* the number of registered instances to wait for
* @param maxWaitTime
* the maximum wait time before this method returns
*/
public static void waitForInstances(JobID jobID, TestInstanceListener instanceListener,
int numberOfInstances, long maxWaitTime) {
final long startTime = System.currentTimeMillis();
while (instanceListener.getNumberOfAllocatedResourcesForJob(jobID) != numberOfInstances) {
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
break;
}
if ((System.currentTimeMillis() - startTime) >= maxWaitTime) {
break;
}
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package org.apache.flink.runtime.instance;
import static org.junit.Assert.*;
import org.junit.Test;
public class HardwareTest {
@Test
public void testCpuCores() {
try {
assertTrue(Hardware.getNumberCPUCores() >= 0);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testPhysicalMemory() {
try {
long physMem = Hardware.getSizeOfPhysicalMemory();
assertTrue(physMem >= -1);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.instance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.topology.NetworkTopology;
import org.junit.Test;
/**
* Tests for {@link org.apache.flink.runtime.instance.Instance}.
*
*/
public class HostInClusterTest {
/**
* Creates a cluster instance of a special test type.
*
* @return a cluster instance of a special test type
*/
private Instance createTestClusterInstance() {
final int ipcPort = ConfigConstants.DEFAULT_TASK_MANAGER_IPC_PORT;
final int dataPort = ConfigConstants.DEFAULT_TASK_MANAGER_DATA_PORT;
InetAddress inetAddress = null;
try {
inetAddress = InetAddress.getByName("127.0.0.1");
} catch (UnknownHostException e) {
fail(e.getMessage());
}
final int numCores = 8;
final int memorySize = 32 * 1024;
final InstanceConnectionInfo instanceConnectionInfo = new InstanceConnectionInfo(inetAddress, ipcPort, dataPort);
final HardwareDescription hardwareDescription = HardwareDescriptionFactory.construct(numCores,
memorySize * 1024L * 1024L, memorySize * 1024L * 1024L);
final NetworkTopology topology = NetworkTopology.createEmptyTopology();
Instance host = new Instance(instanceConnectionInfo, topology.getRootNode(), topology,
hardwareDescription, 8);
return host;
}
/**
* Checks whether the tracking of heart beats is correct.
*/
@Test
public void testHeartBeat() {
// check that heart beat is triggered correctly.
Instance host = createTestClusterInstance();
host.reportHeartBeat();
assertTrue("we have not waited 1 second since last heart beat", host.isStillAlive(1000));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
fail(e.getMessage());
}
assertFalse("we have waited for more than 10 milliseconds", host.isStillAlive(10));
}
/**
* This test covers the internal accounting used by the cluster manager.
*/
@Test
public void testAccounting() {
// check whether the accounting of capacity works correctly
final Instance host = createTestClusterInstance();
final JobID jobID = new JobID();
for (int run = 0; run < 2; ++run) {
// do this twice to check that everything is correctly freed
AllocatedResource[] allocatedSlots = new AllocatedResource[8];
for (int i = 0; i < 8; ++i) {
try {
allocatedSlots[i] = host.allocateSlot(jobID);
}catch(InstanceException ex){
fail(ex.getMessage());
}
assertNotNull(allocatedSlots[i]);
}
// now no resources should be left
boolean instanceException = false;
try{
host.allocateSlot(jobID);
}catch(InstanceException ex){
instanceException = true;
}
assertTrue(instanceException);
for (int i = 0; i < 8; ++i) {
host.releaseSlot(allocatedSlots[i].getAllocationID());
}
}
}
/**
* This test checks if allocated slices inside a cluster instance are removed correctly.
*/
@Test
public void testTermination() {
// check whether the accounting of capacity works correctly if terminateAllInstances is called
final Instance host = createTestClusterInstance();
final JobID jobID = new JobID();
for (int run = 0; run < 2; ++run) {
// do this twice to check that everything is correctly freed
AllocatedResource[] allocatedResources = new AllocatedResource[8];
for (int i = 0; i < 8; ++i) {
try {
allocatedResources[i] = host.allocateSlot(jobID);
}catch (InstanceException ex){
fail(ex.getMessage());
}
assertNotNull(allocatedResources[i]);
}
boolean instanceException = false;
// now no resources should be left
try {
host.allocateSlot(jobID);
} catch (InstanceException ex){
instanceException = true;
}
assertTrue(instanceException);
Collection<AllocatedSlot> allocatedSlots = host.removeAllocatedSlots();
Set<AllocationID> removedAllocationIDs = new HashSet<AllocationID>();
for(AllocatedSlot slot: allocatedSlots){
removedAllocationIDs.add(slot.getAllocationID());
}
final Set<AllocationID> allocationIDs = new HashSet<AllocationID>();
for(int i = 0; i < allocatedResources.length; ++i) {
allocationIDs.add(allocatedResources[i].getAllocationID());
}
//Check if both sets are equal
assertEquals(allocationIDs.size(), removedAllocationIDs.size());
final Iterator<AllocationID> it = allocationIDs.iterator();
while(it.hasNext()) {
assertTrue(removedAllocationIDs.remove(it.next()));
}
assertEquals(0, removedAllocationIDs.size());
}
}
}
/***********************************************************************************************************************
*
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package org.apache.flink.runtime.instance;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.net.InetAddress;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.junit.Test;
public class InstanceConnectionInfoTest {
@Test
public void testEqualsHashAndCompareTo() {
try {
// one == four != two != three
InstanceConnectionInfo one = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 10523, 19871);
InstanceConnectionInfo two = new InstanceConnectionInfo(InetAddress.getByName("0.0.0.0"), 10523, 19871);
InstanceConnectionInfo three = new InstanceConnectionInfo(InetAddress.getByName("192.168.0.1"), 7891, 10871);
InstanceConnectionInfo four = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 10523, 19871);
assertTrue(one.equals(four));
assertTrue(!one.equals(two));
assertTrue(!one.equals(three));
assertTrue(!two.equals(three));
assertTrue(!three.equals(four));
assertTrue(one.compareTo(four) == 0);
assertTrue(four.compareTo(one) == 0);
assertTrue(one.compareTo(two) != 0);
assertTrue(one.compareTo(three) != 0);
assertTrue(two.compareTo(three) != 0);
assertTrue(three.compareTo(four) != 0);
{
int val = one.compareTo(two);
assertTrue(two.compareTo(one) == -val);
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testSerialization() {
try {
// without resolved hostname
{
InstanceConnectionInfo original = new InstanceConnectionInfo(InetAddress.getByName("1.2.3.4"), 9999, 8888);
InstanceConnectionInfo copy = CommonTestUtils.createCopyWritable(original);
assertEquals(original, copy);
InstanceConnectionInfo serCopy = CommonTestUtils.createCopySerializable(original);
assertEquals(original, serCopy);
}
// with resolved hostname
{
InstanceConnectionInfo original = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 10523, 19871);
original.hostname();
InstanceConnectionInfo copy = CommonTestUtils.createCopyWritable(original);
assertEquals(original, copy);
InstanceConnectionInfo serCopy = CommonTestUtils.createCopySerializable(original);
assertEquals(original, serCopy);
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testGetHostname() {
try {
InstanceConnectionInfo info1 = new InstanceConnectionInfo(InetAddress.getByName("127.0.0.1"), 10523, 19871);
assertTrue(info1.hostname() != null);
InstanceConnectionInfo info2 = new InstanceConnectionInfo(InetAddress.getByName("1.2.3.4"), 9999, 8888);
assertTrue(info2.hostname() != null);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.instance;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.List;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.io.StringRecord;
import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.accumulators.AccumulatorEvent;
import org.apache.flink.runtime.client.JobCancelResult;
import org.apache.flink.runtime.client.JobProgressResult;
import org.apache.flink.runtime.client.JobSubmissionResult;
import org.apache.flink.runtime.event.job.AbstractEvent;
import org.apache.flink.runtime.event.job.RecentJobEvent;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertexID;
import org.apache.flink.runtime.executiongraph.InternalJobStatus;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
import org.apache.flink.runtime.io.network.channels.ChannelID;
import org.apache.flink.runtime.ipc.RPC;
import org.apache.flink.runtime.ipc.RPC.Server;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.runtime.jobmanager.DeploymentManager;
import org.apache.flink.runtime.jobmanager.splitassigner.InputSplitWrapper;
import org.apache.flink.runtime.managementgraph.ManagementGraph;
import org.apache.flink.runtime.managementgraph.ManagementVertexID;
import org.apache.flink.runtime.protocols.AccumulatorProtocol;
import org.apache.flink.runtime.protocols.ChannelLookupProtocol;
import org.apache.flink.runtime.protocols.ExtendedManagementProtocol;
import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
import org.apache.flink.runtime.protocols.JobManagerProtocol;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.taskmanager.transferenvelope.RegisterTaskManagerResult;
import org.apache.flink.runtime.types.IntegerRecord;
import org.apache.flink.util.LogUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
public class LocalInstanceManagerTest {
private Server jmServer;
private int port;
@BeforeClass
public static void initLogger() {
LogUtils.initializeDefaultTestConsoleLogger();
}
@Before
public void startJobManagerServer() {
try {
this.port = getAvailablePort();
this.jmServer = RPC.getServer(new EmptyRPCs(), "localhost", this.port, 1);
this.jmServer.start();
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, this.port);
cfg.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 5);
GlobalConfiguration.includeConfiguration(cfg);
}
catch (Throwable t) {
t.printStackTrace();
fail("Preparing the test (initializing the job manager server) failed.");
}
}
@After
public void stopJobManagerServer() {
try {
this.jmServer.stop();
this.jmServer = null;
this.port = 0;
}
catch (Throwable t) {}
}
@Test
public void testCreateSingleTaskManager() {
try {
LocalInstanceManager li = new LocalInstanceManager(1);
try {
TaskManager[] tms = li.getTaskManagers();
assertEquals(1, tms.length);
assertEquals(ExecutionMode.LOCAL, tms[0].getExecutionMode());
assertTrue(tms[0].getConnectionInfo().address().isLoopbackAddress());
}
finally {
li.shutdown();
TaskManager[] tms = li.getTaskManagers();
assertEquals(0, tms.length);
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testCreateMultipleTaskManagers() {
try {
LocalInstanceManager li = new LocalInstanceManager(2);
try {
TaskManager[] tms = li.getTaskManagers();
assertEquals(2, tms.length);
for (TaskManager tm : tms) {
assertEquals(ExecutionMode.CLUSTER, tm.getExecutionMode());
assertTrue(tm.getConnectionInfo().address().isLoopbackAddress());
}
}
finally {
li.shutdown();
TaskManager[] tms = li.getTaskManagers();
assertEquals(0, tms.length);
}
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
private static final int getAvailablePort() throws IOException {
ServerSocket serverSocket = null;
for (int i = 0; i < 50; i++){
try {
serverSocket = new ServerSocket(0);
int port = serverSocket.getLocalPort();
if (port != 0) {
return port;
}
}
catch (IOException e) {}
finally {
if (serverSocket != null) {
serverSocket.close();
}
}
}
throw new IOException("Could not find a free port.");
}
private static final class EmptyRPCs implements DeploymentManager, ExtendedManagementProtocol, InputSplitProviderProtocol,
JobManagerProtocol, ChannelLookupProtocol, JobStatusListener, AccumulatorProtocol
{
@Override
public JobSubmissionResult submitJob(JobGraph job) {
return null;
}
@Override
public JobProgressResult getJobProgress(JobID jobID) {
return null;
}
@Override
public JobCancelResult cancelJob(JobID jobID) {
return null;
}
@Override
public IntegerRecord getRecommendedPollingInterval() {
return null;
}
@Override
public void reportAccumulatorResult(AccumulatorEvent accumulatorEvent) {}
@Override
public AccumulatorEvent getAccumulatorResults(JobID jobID) {
return null;
}
@Override
public void jobStatusHasChanged(ExecutionGraph executionGraph, InternalJobStatus newJobStatus, String optionalMessage) {}
@Override
public ConnectionInfoLookupResponse lookupConnectionInfo(InstanceConnectionInfo caller, JobID jobID, ChannelID sourceChannelID) {
return null;
}
@Override
public void sendHeartbeat(InstanceConnectionInfo instanceConnectionInfo) {}
@Override
public RegisterTaskManagerResult registerTaskManager(InstanceConnectionInfo instanceConnectionInfo,
HardwareDescription hardwareDescription, IntegerRecord numberOfSlots)
{
return new RegisterTaskManagerResult(RegisterTaskManagerResult.ReturnCode.SUCCESS);
}
@Override
public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {}
@Override
public InputSplitWrapper requestNextInputSplit(JobID jobID, ExecutionVertexID vertexID, IntegerRecord sequenceNumber) {
return null;
}
@Override
public ManagementGraph getManagementGraph(JobID jobID) {
return null;
}
@Override
public List<RecentJobEvent> getRecentJobs() {
return null;
}
@Override
public List<AbstractEvent> getEvents(JobID jobID) {
return null;
}
@Override
public void killTask(JobID jobID, ManagementVertexID id) {}
@Override
public void killInstance(StringRecord instanceName) {}
@Override
public void logBufferUtilization(JobID jobID) {}
@Override
public int getAvailableSlots() {
return 0;
}
@Override
public void deploy(JobID jobID, Instance instance, List<ExecutionVertex> verticesToBeDeployed) {}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.instance;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.runtime.instance.AllocatedResource;
import org.apache.flink.runtime.instance.InstanceListener;
import org.apache.flink.runtime.jobgraph.JobID;
/**
* This class implements an instance listener that can be used for unit tests.
* <p>
* This class is thread-safe.
*
*/
public class TestInstanceListener implements InstanceListener {
/**
* Stores the resources allocates
*/
final Map<JobID, List<AllocatedResource>> resourcesOfJobs = new HashMap<JobID, List<AllocatedResource>>();
@Override
public synchronized void resourcesAllocated(final JobID jobID, final List<AllocatedResource> allocatedResources) {
List<AllocatedResource> allocatedResourcesOfJob = this.resourcesOfJobs.get(jobID);
if (allocatedResourcesOfJob == null) {
allocatedResourcesOfJob = new ArrayList<AllocatedResource>();
this.resourcesOfJobs.put(jobID, allocatedResourcesOfJob);
}
for (final AllocatedResource allocatedResource : allocatedResources) {
if (allocatedResourcesOfJob.contains(allocatedResource)) {
throw new IllegalStateException("Resource " + allocatedResource.getAllocationID()
+ " is already allocated by job " + jobID);
}
allocatedResourcesOfJob.add(allocatedResource);
}
}
@Override
public synchronized void allocatedResourcesDied(final JobID jobID, final List<AllocatedResource> allocatedResources) {
List<AllocatedResource> allocatedResourcesOfJob = this.resourcesOfJobs.get(jobID);
if (allocatedResourcesOfJob == null) {
throw new IllegalStateException("Unable to find allocated resources for job with ID " + jobID);
}
for (final AllocatedResource allocatedResource : allocatedResources) {
if (!allocatedResourcesOfJob.remove(allocatedResource)) {
throw new IllegalStateException("Resource " + allocatedResource.getAllocationID()
+ " is not assigned to job " + jobID);
}
}
if (allocatedResourcesOfJob.isEmpty()) {
this.resourcesOfJobs.remove(jobID);
}
}
/**
* Returns the number of allocated resources for the job with the given job ID.
*
* @param jobID
* the job ID specifying the job
* @return the number of allocated resources for the job
*/
public synchronized int getNumberOfAllocatedResourcesForJob(JobID jobID) {
final List<AllocatedResource> allocatedResources = this.resourcesOfJobs.get(jobID);
if (allocatedResources == null) {
return 0;
}
return allocatedResources.size();
}
/**
* Returns a list of resources allocated for the given job.
*
* @param jobID
* the job ID specifying the job
* @return the (possibly empty) list of resource allocated for the job
*/
public synchronized List<AllocatedResource> getAllocatedResourcesForJob(JobID jobID) {
final List<AllocatedResource> allocatedResources = this.resourcesOfJobs.get(jobID);
if (allocatedResources == null) {
return new ArrayList<AllocatedResource>();
}
return new ArrayList<AllocatedResource>(allocatedResources);
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.instance.local;
import org.junit.Assert;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.ExecutionMode;
import org.apache.flink.runtime.instance.InstanceManager;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.junit.Test;
public class LocalInstanceManagerTest {
/**
* Checks if the local instance manager reads the default correctly from the configuration file.
*/
@Test
public void testInstanceTypeFromConfiguration() {
try {
Configuration cfg = new Configuration();
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "127.0.0.1");
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 6123);
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
cfg.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
GlobalConfiguration.includeConfiguration(cfg);
// start JobManager
ExecutionMode executionMode = ExecutionMode.LOCAL;
JobManager jm = new JobManager(executionMode);
final TestInstanceListener testInstanceListener = new TestInstanceListener();
InstanceManager im = jm.getInstanceManager();
try {
im.setInstanceListener(testInstanceListener);
} catch (Exception e) {
e.printStackTrace();
Assert.fail("Instantiation of LocalInstanceManager failed: " + e.getMessage());
} finally {
jm.shutdown();
}
}
catch (Exception e) {
System.err.println(e.getMessage());
e.printStackTrace();
Assert.fail("Test caused an error: " + e.getMessage());
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.instance.local;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.runtime.instance.AllocatedResource;
import org.apache.flink.runtime.instance.InstanceListener;
import org.apache.flink.runtime.jobgraph.JobID;
/**
* This class implements an instance listener that can be used for unit tests.
* <p>
* This class is thread-safe.
*
*/
public final class TestInstanceListener implements InstanceListener {
/**
* Stores the resources allocates
*/
final Map<JobID, List<AllocatedResource>> resourcesOfJobs = new HashMap<JobID, List<AllocatedResource>>();
@Override
public synchronized void resourcesAllocated(JobID jobID, List<AllocatedResource> allocatedResources) {
List<AllocatedResource> allocatedResourcesOfJob = this.resourcesOfJobs.get(jobID);
if (allocatedResourcesOfJob == null) {
allocatedResourcesOfJob = new ArrayList<AllocatedResource>();
this.resourcesOfJobs.put(jobID, allocatedResourcesOfJob);
}
for (final AllocatedResource allocatedResource : allocatedResources) {
if (allocatedResourcesOfJob.contains(allocatedResource)) {
throw new IllegalStateException("Resource " + allocatedResource.getAllocationID()
+ " is already allocated by job " + jobID);
}
allocatedResourcesOfJob.add(allocatedResource);
}
}
@Override
public synchronized void allocatedResourcesDied(JobID jobID, List<AllocatedResource> allocatedResources) {
final List<AllocatedResource> allocatedResourcesOfJob = this.resourcesOfJobs.get(jobID);
if (allocatedResourcesOfJob == null) {
throw new IllegalStateException("Unable to find allocated resources for job with ID " + jobID);
}
for (final AllocatedResource allocatedResource : allocatedResources) {
if (!allocatedResourcesOfJob.remove(allocatedResource)) {
throw new IllegalStateException("Resource " + allocatedResource.getAllocationID()
+ " is not assigned to job " + jobID);
}
}
if (allocatedResourcesOfJob.isEmpty()) {
this.resourcesOfJobs.remove(jobID);
}
}
/**
* Returns the number of allocated resources for the job with the given job ID.
*
* @param jobID
* the job ID specifying the job
* @return the number of allocated resources for the job
*/
public synchronized int getNumberOfAllocatedResourcesForJob(JobID jobID) {
final List<AllocatedResource> allocatedResources = this.resourcesOfJobs.get(jobID);
if (allocatedResources == null) {
return 0;
}
return allocatedResources.size();
}
/**
* Returns a list of resources allocated for the given job.
*
* @param jobID
* the job ID specifying the job
* @return the (possibly empty) list of resource allocated for the job
*/
public synchronized List<AllocatedResource> getAllocatedResourcesForJob(JobID jobID) {
final List<AllocatedResource> allocatedResources = this.resourcesOfJobs.get(jobID);
if (allocatedResources == null) {
return new ArrayList<AllocatedResource>();
}
return new ArrayList<AllocatedResource>(allocatedResources);
}
}
......@@ -27,6 +27,8 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
......@@ -35,8 +37,7 @@ import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
/**
* This class contains auxiliary methods for unit tests in the Nephele common module.
*
* This class contains auxiliary methods for unit tests.
*/
public class CommonTestUtils {
......@@ -82,7 +83,6 @@ public class CommonTestUtils {
* @return the path to the directory for temporary files
*/
public static String getTempDir() {
return GlobalConfiguration.getString(ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH).split(File.pathSeparator)[0];
}
......@@ -98,7 +98,7 @@ public class CommonTestUtils {
* thrown if an error occurs while creating the copy of the object
*/
@SuppressWarnings("unchecked")
public static <T extends IOReadableWritable> T createCopy(final T original) throws IOException {
public static <T extends IOReadableWritable> T createCopyWritable(final T original) throws IOException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream dos = new DataOutputStream(baos);
......@@ -106,9 +106,6 @@ public class CommonTestUtils {
original.write(new OutputViewDataOutputStreamWrapper(dos));
final String className = original.getClass().getName();
if (className == null) {
fail("Class name is null");
}
Class<T> clazz = null;
......@@ -118,28 +115,68 @@ public class CommonTestUtils {
fail(e.getMessage());
}
if (clazz == null) {
fail("Cannot find class with name " + className);
}
T copy = null;
try {
copy = clazz.newInstance();
} catch (InstantiationException e) {
fail(e.getMessage());
} catch (IllegalAccessException e) {
fail(e.getMessage());
}
if (copy == null) {
fail("Copy of object of type " + className + " is null");
} catch (Throwable t) {
t.printStackTrace();
fail(t.getMessage());
}
final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
final DataInputStream dis = new DataInputStream(bais);
copy.read(new InputViewDataInputStreamWrapper(dis));
if (dis.available() > 0) {
throw new IOException("The coped result was not fully consumed.");
}
return copy;
}
@SuppressWarnings("unchecked")
public static <T extends java.io.Serializable> T createCopySerializable(T original) throws IOException {
if (original == null) {
throw new IllegalArgumentException();
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(original);
oos.close();
baos.close();
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
ObjectInputStream ois = new ObjectInputStream(bais);
T copy;
try {
copy = (T) ois.readObject();
}
catch (ClassNotFoundException e) {
throw new IOException(e);
}
ois.close();
bais.close();
return copy;
}
public static void sleepUninterruptibly(long msecs) {
long now = System.currentTimeMillis();
long sleepUntil = now + msecs;
long remaining;
while ((remaining = sleepUntil - now) > 0) {
try {
Thread.sleep(remaining);
}
catch (InterruptedException e) {}
now = System.currentTimeMillis();
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.topology;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import org.apache.flink.runtime.testutils.ManagementTestUtils;
import org.apache.flink.runtime.topology.NetworkNode;
import org.apache.flink.runtime.topology.NetworkTopology;
import org.junit.Test;
/**
* This class contains unit tests which address the correctness of the network topology implementation.
*
*/
public class NetworkTopologyTest {
/**
* This test checks some basic functionality like loading a network topology from a file, looking up network nodes
* in the topology by their name, or determining the depth of network node with respect to the topology.
*/
@Test
public void testNetworkTopology() {
NetworkTopology topology;
try {
topology = NetworkTopology.fromFile(getPathToTopologyFile());
} catch (IOException e) {
fail(e.getMessage());
return;
}
assertEquals(topology.getDepth(), 4);
final NetworkNode networkNode = topology.getNodeByName("rackswitch3");
if (networkNode == null) {
assertEquals("Cannot find network node rackswitch3", true, false);
return;
}
assertEquals(networkNode.getNumberOfChildNodes(), 4);
}
/**
* This test checks the distance computation between two network nodes within a network topology.
*/
@Test
public void testTopologyDistance() {
NetworkTopology topology;
try {
topology = NetworkTopology.fromFile(getPathToTopologyFile());
} catch (IOException e) {
fail(e.getMessage());
return;
}
final NetworkNode node01 = topology.getNodeByName("node01");
final NetworkNode node02 = topology.getNodeByName("node02");
final NetworkNode node05 = topology.getNodeByName("node05");
final NetworkNode node11 = topology.getNodeByName("node11");
final NetworkNode rackswitch1 = topology.getNodeByName("rackswitch1");
final NetworkNode mainswitch1 = topology.getNodeByName("mainswitch1");
// Check if the network node lookup works correctly
assertNotNull(node01);
assertNotNull(node02);
assertNotNull(node05);
assertNotNull(node11);
assertNotNull(rackswitch1);
assertNotNull(mainswitch1);
assertNull(topology.getNodeByName("nonexistant node"));
// Check the various distances between the nodes
assertEquals(0, node01.getDistance(node01));
assertEquals(2, node01.getDistance(node02));
assertEquals(4, node01.getDistance(node05));
assertEquals(6, node01.getDistance(node11));
assertEquals(1, node01.getDistance(rackswitch1));
assertEquals(1, rackswitch1.getDistance(node01));
assertEquals(2, mainswitch1.getDistance(node02));
assertEquals(3, topology.getRootNode().getDistance(node11));
// Finally, create a new topology a new node and make sure the distance computation returns the correct
// result
final NetworkTopology otherTopology = new NetworkTopology();
final NetworkNode nodeFromOtherTopology = new NetworkNode("node from other topology",
otherTopology.getRootNode(), otherTopology);
assertEquals(Integer.MAX_VALUE, node01.getDistance(nodeFromOtherTopology));
// topology2.addNode(new NetworkNode(", parentNode, networkTopology))
}
/**
* Locates the file which contains the network topology. The network topology file is required for these tests.
*
* @return a file object pointing to the network topology file
* @throws IOException
* thrown if the network topology could not be located within the test resources
*/
private File getPathToTopologyFile() throws IOException {
final File topology = ManagementTestUtils.locateResource("topology.txt");
if (topology != null) {
return topology;
}
throw new IOException("Cannot locate topology file to perform unit tests");
}
}
......@@ -61,7 +61,7 @@ public class StringRecordTest {
try {
final StringRecord copy = (StringRecord) CommonTestUtils.createCopy(orig);
final StringRecord copy = (StringRecord) CommonTestUtils.createCopyWritable(orig);
assertEquals(orig.getLength(), copy.getLength());
assertEquals(orig.toString(), copy.toString());
......
......@@ -50,7 +50,7 @@ public class TypeTest {
assertEquals(orig.getDataBuffer().length, 2 * data.length);
try {
final FileRecord copy = (FileRecord) CommonTestUtils.createCopy(orig);
final FileRecord copy = (FileRecord) CommonTestUtils.createCopyWritable(orig);
assertEquals(orig.getFileName(), copy.getFileName());
assertEquals(orig, copy);
......@@ -72,7 +72,7 @@ public class TypeTest {
try {
final IntegerRecord copy = (IntegerRecord) CommonTestUtils.createCopy(orig);
final IntegerRecord copy = (IntegerRecord) CommonTestUtils.createCopyWritable(orig);
assertEquals(orig.getValue(), copy.getValue());
assertEquals(orig, copy);
......
/***********************************************************************************************************************
*
* Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
**********************************************************************************************************************/
package org.apache.flink.runtime.util;
import static org.junit.Assert.*;
import org.junit.Test;
public class EnvironmentInformationTest {
@Test
public void testJavaMemory() {
try {
long fullHeap = EnvironmentInformation.getMaxJvmHeapMemory();
long free = EnvironmentInformation.getSizeOfFreeHeapMemory();
long freeWithGC = EnvironmentInformation.getSizeOfFreeHeapMemoryWithDefrag();
assertTrue(fullHeap > 0);
assertTrue(free > 0);
assertTrue(freeWithGC > 0);
assertTrue(free <= fullHeap);
assertTrue(freeWithGC <= fullHeap);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
@Test
public void testEnvironmentMethods() {
try {
assertNotNull(EnvironmentInformation.getJvmStartupOptions());
assertNotNull(EnvironmentInformation.getJvmVersion());
assertNotNull(EnvironmentInformation.getRevisionInformation());
assertNotNull(EnvironmentInformation.getVersion());
assertNotNull(EnvironmentInformation.getUserRunning());
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}
......@@ -28,9 +28,9 @@ public class WordCountITCase extends JavaProgramTestBase {
protected String resultPath;
public WordCountITCase(){
setDegreeOfParallelism(4);
setNumTaskTracker(2);
setTaskManagerNumSlots(2);
// setDegreeOfParallelism(4);
// setNumTaskTracker(2);
// setTaskManagerNumSlots(2);
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册