提交 933609f0 编写于 作者: S Stephan Ewen

[FLINK-1608] [taskmanager] Hostname/address for TaskManager can be specified in the configuration

上级 086209ca
...@@ -80,6 +80,11 @@ public final class ConfigConstants { ...@@ -80,6 +80,11 @@ public final class ConfigConstants {
*/ */
public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager.cleanup.interval"; public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager.cleanup.interval";
/**
* The config parameter defining the task manager's hostname.
*/
public static final String TASK_MANAGER_HOSTNAME_KEY = "taskmanager.hostname";
/** /**
* The config parameter defining the task manager's IPC port from the configuration. * The config parameter defining the task manager's IPC port from the configuration.
*/ */
......
...@@ -834,11 +834,18 @@ object TaskManager { ...@@ -834,11 +834,18 @@ object TaskManager {
val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration) val (jobManagerHostname, jobManagerPort) = getAndCheckJobManagerAddress(configuration)
var taskManagerHostname = configuration.getString(
ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, null)
if (taskManagerHostname != null) {
LOG.info("Using configured hostname/address for TaskManager: " + taskManagerHostname)
}
else {
// try to find out the hostname of the interface from which the TaskManager // try to find out the hostname of the interface from which the TaskManager
// can connect to the JobManager. This involves a reverse name lookup // can connect to the JobManager. This involves a reverse name lookup
LOG.info("Trying to determine network interface and address/hostname to use") LOG.info("Trying to select the network interface and address/hostname to use")
val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort) val jobManagerAddress = new InetSocketAddress(jobManagerHostname, jobManagerPort)
val taskManagerHostname = try { taskManagerHostname = try {
NetUtils.resolveAddress(jobManagerAddress).getHostName() NetUtils.resolveAddress(jobManagerAddress).getHostName()
} }
catch { catch {
...@@ -847,6 +854,7 @@ object TaskManager { ...@@ -847,6 +854,7 @@ object TaskManager {
} }
LOG.info("TaskManager will use hostname/address '{}' for communication.", taskManagerHostname) LOG.info("TaskManager will use hostname/address '{}' for communication.", taskManagerHostname)
}
// if no task manager port has been configured, use 0 (system will pick any free port) // if no task manager port has been configured, use 0 (system will pick any free port)
val actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0) val actorSystemPort = configuration.getInteger(ConfigConstants.TASK_MANAGER_IPC_PORT_KEY, 0)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册