提交 3c1bbf26 编写于 作者: G Gary Yao

[hotfix][runtime] Refactor TaskManagerRunner#createRpcService

Extract method determineTaskManagerHostname.
上级 85c3e7e2
......@@ -36,6 +36,7 @@ import org.apache.flink.runtime.entrypoint.parser.CommandLineParser;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
......@@ -400,30 +401,45 @@ public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync
* @param haServices to use for the task manager hostname retrieval
*/
public static RpcService createRpcService(
final Configuration configuration,
final HighAvailabilityServices haServices) throws Exception {
final Configuration configuration,
final HighAvailabilityServices haServices) throws Exception {
checkNotNull(configuration);
checkNotNull(haServices);
String taskManagerHostname = configuration.getString(TaskManagerOptions.HOST);
final String taskManagerHostname = determineTaskManagerHostname(configuration, haServices);
final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT);
if (taskManagerHostname != null) {
LOG.info("Using configured hostname/address for TaskManager: {}.", taskManagerHostname);
} else {
Time lookupTimeout = Time.milliseconds(AkkaUtils.getLookupTimeout(configuration).toMillis());
return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portRangeDefinition, configuration);
}
InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
haServices.getResourceManagerLeaderRetriever(),
lookupTimeout);
private static String determineTaskManagerHostname(
final Configuration configuration,
final HighAvailabilityServices haServices) throws LeaderRetrievalException {
taskManagerHostname = taskManagerAddress.getHostName();
final String configuredTaskManagerHostname = configuration.getString(TaskManagerOptions.HOST);
LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
taskManagerHostname, taskManagerAddress.getHostAddress());
if (configuredTaskManagerHostname != null) {
LOG.info("Using configured hostname/address for TaskManager: {}.", configuredTaskManagerHostname);
return configuredTaskManagerHostname;
} else {
return determineTaskManagerHostnameByConnectingToResourceManager(configuration, haServices);
}
}
final String portRangeDefinition = configuration.getString(TaskManagerOptions.RPC_PORT);
return AkkaRpcServiceUtils.createRpcService(taskManagerHostname, portRangeDefinition, configuration);
private static String determineTaskManagerHostnameByConnectingToResourceManager(
final Configuration configuration,
final HighAvailabilityServices haServices) throws LeaderRetrievalException {
final Time lookupTimeout = Time.milliseconds(AkkaUtils.getLookupTimeout(configuration).toMillis());
final InetAddress taskManagerAddress = LeaderRetrievalUtils.findConnectingAddress(
haServices.getResourceManagerLeaderRetriever(),
lookupTimeout);
LOG.info("TaskManager will use hostname/address '{}' ({}) for communication.",
taskManagerAddress.getHostName(), taskManagerAddress.getHostAddress());
return taskManagerAddress.getHostName();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册