提交 94a66d57 编写于 作者: S Stephan Ewen

[FLINK-1631] [jobmanager] Deactivate web frontend in parallel tests (prevent port collisions)

上级 a1162b70
......@@ -247,7 +247,7 @@ TaskManager hearbeat may be missing before the TaskManager is considered failed.
- `jobmanager.web.port`: Port of the JobManager's web interface that displays
status of running jobs and execution time breakdowns of finished jobs
(DEFAULT: 8081).
(DEFAULT: 8081). Setting this value to `-1` disables the web frontend.
- `jobmanager.web.history`: The number of latest jobs that the JobManager's web
front-end in its history (DEFAULT: 5).
......
......@@ -234,7 +234,7 @@ public final class ConfigConstants {
// ------------------------- JobManager Web Frontend ----------------------
/**
* The port for the pact web-frontend server.
* The port for the runtime monitor web-frontend server.
*/
public static final String JOB_MANAGER_WEB_PORT_KEY = "jobmanager.web.port";
......
......@@ -29,7 +29,7 @@ jobmanager.heap.mb: 256
taskmanager.heap.mb: 512
taskmanager.numberOfTaskSlots: -1
taskmanager.numberOfTaskSlots: 1
parallelization.degree.default: 1
......@@ -37,8 +37,14 @@ parallelization.degree.default: 1
# Web Frontend
#==============================================================================
# The port under which the web-based runtime monitor listens.
# A value of -1 deactivates the web server.
jobmanager.web.port: 8081
# The port uder which the standalone web client
# (for job upload and submit) listens.
webclient.port: 8080
#==============================================================================
......
......@@ -42,7 +42,6 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import scala.concurrent.duration.FiniteDuration;
/**
* This class sets up a web-server that contains a web frontend to display information about running jobs.
* It instantiates and configures an embedded jetty server.
......@@ -67,14 +66,16 @@ public class WebInfoServer {
/**
* Port for info server
*/
private int port;
private final int port;
/**
* Creates a new web info server. The server runs the servlets that implement the logic
* to list all present information concerning the job manager
*
* @param config
* The configuration for the flink job manager.
* @param config The Flink configuration.
* @param jobmanager The ActorRef to the JobManager actor
* @param archive The ActorRef to the archive for old jobs
*
* @throws IOException
* Thrown, if the server setup failed for an I/O related reason.
*/
......@@ -86,10 +87,13 @@ public class WebInfoServer {
throw new NullPointerException();
}
final FiniteDuration timeout = AkkaUtils.getTimeout(config);
this.port = config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY,
ConfigConstants.DEFAULT_JOB_MANAGER_WEB_FRONTEND_PORT);
if (this.port <= 0) {
throw new IllegalArgumentException("Invalid port for the webserver: " + this.port);
}
final FiniteDuration timeout = AkkaUtils.getTimeout(config);
// get base path of Flink installation
final String basePath = config.getString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, "");
......@@ -117,7 +121,6 @@ public class WebInfoServer {
server = new Server(port);
// ----- the handlers for the servlets -----
ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
servletContext.setContextPath("/");
......@@ -197,8 +200,7 @@ public class WebInfoServer {
server.stop();
}
public Server getServer() {
return server;
public int getServerPort() {
return this.port;
}
}
......@@ -726,9 +726,11 @@ object JobManager {
}
// start the job manager web frontend
LOG.info("Starting JobManger web frontend")
val webServer = new WebInfoServer(configuration, jobManager, archiver)
webServer.start()
if (configuration.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) != -1) {
LOG.info("Starting JobManger web frontend")
val webServer = new WebInfoServer(configuration, jobManager, archiver)
webServer.start()
}
}
catch {
case t: Throwable => {
......
......@@ -26,6 +26,7 @@ import static org.apache.flink.runtime.testutils.CommonTestUtils.isProcessAlive;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
......@@ -178,7 +179,11 @@ public class JobManagerProcessReapingTest {
public static void main(String[] args) {
try {
int port = Integer.parseInt(args[0]);
JobManager.runJobManager(new Configuration(), ExecutionMode.CLUSTER(), "localhost", port);
Configuration config = new Configuration();
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1);
JobManager.runJobManager(config, ExecutionMode.CLUSTER(), "localhost", port);
System.exit(0);
}
catch (Throwable t) {
......
......@@ -41,6 +41,7 @@ class TestingCluster(userConfiguration: Configuration, singleActorSystem: Boolea
cfg.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost")
cfg.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, NetUtils.getAvailablePort())
cfg.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 10)
cfg.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
cfg.addAll(userConfig)
cfg
......
......@@ -65,6 +65,8 @@ class ForkableFlinkMiniCluster(userConfiguration: Configuration, singleActorSyst
config.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, taskManagerData)
}
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
super.generateConfiguration(config)
}
......
......@@ -93,12 +93,14 @@ object ApplicationMaster {
val jobManagerPort = extActor.provider.getDefaultAddress.port.get
// start the web info server
LOG.info("Starting Job Manger web frontend.")
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDirs)
webserver = new WebInfoServer(config, jobManager, archiver)
webserver.start()
if (config.getInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) != -1) {
LOG.info("Starting Job Manger web frontend.")
config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logDirs)
webserver = new WebInfoServer(config, jobManager, archiver)
webserver.start()
}
val jobManagerWebPort = webserver.getServer.getConnectors()(0).getLocalPort
val jobManagerWebPort = if (webserver == null) -1 else webserver.getServerPort
// generate configuration file for TaskManagers
generateConfigurationFile(s"$currDir/$MODIFIED_CONF_FILE", currDir, ownHostname,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册