提交 23e30f09 编写于 作者: R Robert Metzger

[FLINK-1170] address pull request review comments

This closes #158
上级 b6f599f1
......@@ -180,8 +180,7 @@ public final class LocatableInputSplitAssigner implements InputSplitAssigner {
return false;
}
for (String h : hosts) {
final String hadoopHost = NetUtils.getHostnameFromFQDN(h.toLowerCase());
if (h != null && hadoopHost.toLowerCase().equals(flinkHost)) {
if (h != null && NetUtils.getHostnameFromFQDN(h.toLowerCase()).equals(flinkHost)) {
return true;
}
}
......
......@@ -26,6 +26,9 @@ public class NetUtils {
* @return
*/
public static String getHostnameFromFQDN(String fqdn) {
if(fqdn == null) {
throw new IllegalArgumentException("Input string is null (fqdn)");
}
int dotPos = fqdn.indexOf('.');
if(dotPos == -1) {
return fqdn;
......
......@@ -63,7 +63,13 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
/**
* The hostname
*/
private String hostname;
private String hostName;
/**
* This flag indicates if the FQDN hostname cound not be resolved and is represented
* as an IP address (string).
*/
private boolean fqdnHostNameIsIP = false;
/**
......@@ -94,6 +100,19 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
this.ipcPort = ipcPort;
this.dataPort = dataPort;
this.inetAddress = inetAddress;
// get FQDN hostname on this TaskManager.
try {
this.fqdnHostName = this.inetAddress.getCanonicalHostName();
} catch (Throwable t) {
LOG.warn("Unable to determine hostname for TaskManager. The performance might be degraded since HDFS input split assignment is not possible");
if(LOG.isDebugEnabled()) {
LOG.debug("getCanonicalHostName() Exception", t);
}
// could not determine host name, so take IP textual representation
this.fqdnHostName = inetAddress.getHostAddress();
this.fqdnHostNameIsIP = true;
}
}
/**
......@@ -135,27 +154,19 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
* @return the host name of the instance
*/
public String getFQDNHostname() {
if (this.fqdnHostName == null) {
try {
this.fqdnHostName = this.inetAddress.getCanonicalHostName();
} catch (Throwable t) {
LOG.warn("Unable to determine hostname for TaskManager. The performance might be degraded since HDFS input split assignment is not possible");
if(LOG.isDebugEnabled()) {
LOG.debug("getCanonicalHostName() Exception", t);
}
// could not determine host name, so take IP textual representation
this.fqdnHostName = inetAddress.getHostAddress();
}
}
return this.fqdnHostName;
}
public String getHostname() {
if(hostname == null) {
if(hostName == null) {
String fqdn = getFQDNHostname();
hostname = NetUtils.getHostnameFromFQDN(fqdn);
if(this.fqdnHostNameIsIP) { // fqdn to hostname translation is pointless if FQDN is an ip address.
hostName = fqdn;
} else {
hostName = NetUtils.getHostnameFromFQDN(fqdn);
}
}
return hostname;
return hostName;
}
public String getInetAdress() {
......@@ -177,6 +188,8 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
this.dataPort = in.readInt();
this.fqdnHostName = StringUtils.readNullableString(in);
this.hostName = StringUtils.readNullableString(in);
this.fqdnHostNameIsIP = in.readBoolean();
try {
this.inetAddress = InetAddress.getByAddress(address);
......@@ -195,6 +208,8 @@ public class InstanceConnectionInfo implements IOReadableWritable, Comparable<In
out.writeInt(this.dataPort);
StringUtils.writeNullableString(fqdnHostName, out);
StringUtils.writeNullableString(hostName, out);
out.writeBoolean(fqdnHostNameIsIP);
}
// --------------------------------------------------------------------------------------------
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册