提交 f6051277 编写于 作者: R Robert Metzger

[YARN] Fix issue with port-offsetting.

上级 d749c24d
......@@ -450,7 +450,7 @@ public class Client {
*/
int appNumber = appId.getId();
jmPort += appNumber;
jmPort = Utils.offsetPort(jmPort, appNumber);
// Setup jar for ApplicationMaster
LocalResource appMasterJar = Records.newRecord(LocalResource.class);
......@@ -492,7 +492,7 @@ public class Client {
fs.close();
int amRPCPort = GlobalConfiguration.getInteger(ConfigConstants.YARN_AM_PRC_PORT, ConfigConstants.DEFAULT_YARN_AM_RPC_PORT);
amRPCPort += appNumber;
amRPCPort = Utils.offsetPort(amRPCPort, appNumber);
// Setup CLASSPATH for ApplicationMaster
Map<String, String> appMasterEnv = new HashMap<String, String>();
Utils.setupEnv(conf, appMasterEnv);
......
......@@ -106,13 +106,12 @@ public class ClientMasterControl extends Thread {
}
}
public boolean shutdownAM() {
public void shutdownAM() {
try {
boolean result = cmp.shutdownAM().getValue();
return result;
cmp.shutdownAM();
} catch(Throwable e) {
LOG.warn("Error shutting down the application master", e);
return false;
// the old RPC service is unable to shut down itself. So the java.io.EOFException is expected here.
LOG.debug("This exception is expected", e);
}
}
......
......@@ -266,4 +266,23 @@ public class Utils {
environment.put(StringInterner.weakIntern(variable),
StringInterner.weakIntern(val));
}
/**
* Valid ports are 1024 - 65535.
* We offset the incoming port by the applicationId to avoid port collisions if YARN allocates two ApplicationMasters
* on the same physical hardware
*/
public static int offsetPort(int port, int appId) {
if(port > 65535) {
LOG.warn("The specified YARN RPC port ("+port+") is above the maximum possible port 65535."
+ "Setting it to "+64535);
port = 64535;
}
if(port + (appId % 1000) > 65535) {
LOG.warn("The specified YARN RPC port ("+port+") is, when offsetted by the ApplicationID ("+appId+") above "
+ "the maximum possible port 65535. Setting it to "+64535);
port = port - 1000;
}
return port + (appId % 1000);
}
}
......@@ -18,7 +18,6 @@
package org.apache.flink.yarn.appMaster;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
......@@ -44,7 +43,6 @@ import org.apache.flink.runtime.ipc.RPC;
import org.apache.flink.runtime.ipc.RPC.Server;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.util.SerializableArrayList;
import org.apache.flink.types.BooleanValue;
import org.apache.flink.util.StringUtils;
import org.apache.flink.yarn.Client;
import org.apache.flink.yarn.Utils;
......@@ -220,12 +218,13 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
// determine JobManager port
int port = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
if(port != -1) {
port += appNumber;
port = Utils.offsetPort(port,appNumber);
} else {
LOG.warn("JobManager port is unknown");
}
this.jobManagerPort = port;
this.jobManagerWebPort = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT)+appNumber;
int jmWebPort = GlobalConfiguration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
this.jobManagerWebPort = Utils.offsetPort(jmWebPort, appNumber);
}
private void setFailed(boolean failed) {
......@@ -506,7 +505,7 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
@Override
public BooleanValue shutdownAM() throws Exception {
public void shutdownAM() throws Exception {
LOG.info("Client requested shutdown of AM");
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
String finalMessage = "";
......@@ -518,7 +517,6 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
}
rmClient.unregisterApplicationMaster(finalStatus, finalMessage, "");
this.close();
return new BooleanValue(true);
}
private void close() throws Exception {
......@@ -527,12 +525,15 @@ public class ApplicationMaster implements YARNClientMasterProtocol {
nmClient.close();
rmClient.close();
if(!isFailed) {
// amRpcServer.stop();
amRpcServer.stop();
} else {
LOG.warn("Can not close AM RPC connection since the AM is in failed state");
}
} else {
LOG.warn("The AM has already been closed before");
}
this.isClosed = true;
System.exit(0); // kill it hard.
}
@Override
......
......@@ -26,7 +26,6 @@ import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.protocols.VersionedProtocol;
import org.apache.flink.types.BooleanValue;
/**
......@@ -71,7 +70,7 @@ public interface YARNClientMasterProtocol extends VersionedProtocol {
ApplicationMasterStatus getAppplicationMasterStatus();
BooleanValue shutdownAM() throws Exception;
void shutdownAM() throws Exception;
List<Message> getMessages();
......
......@@ -279,6 +279,13 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
this.scheduler.shutdown();
}
if(this.server != null) {
try {
this.server.stop();
} catch (Exception e1) {
throw new RuntimeException("Error shtopping the web-info-server.", e1);
}
}
this.isShutDown = true;
LOG.debug("Shutdown of job manager completed");
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册