提交 ec35d1c5 编写于 作者: T Till Rohrmann

Added the webserver to the job manager.

上级 de88a10d
......@@ -108,7 +108,7 @@ public class WebInfoServer {
LOG.info("Setting up web info server, using web-root directory '" + webDir.getAbsolutePath() + "'.");
//LOG.info("Web info server will store temporary files in '" + tmpDir.getAbsolutePath());
LOG.info("Web info server will display information about nephele job-manager on "
LOG.info("Web info server will display information about flink job-manager on "
+ config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null) + ", port "
+ port
+ ".");
......
......@@ -29,6 +29,7 @@ import org.apache.flink.core.io.InputSplitAssigner
import org.apache.flink.runtime.blob.BlobServer
import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, ExecutionGraph}
import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse
import org.apache.flink.runtime.jobmanager.web.WebInfoServer
import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
import org.apache.flink.runtime.messages.ExecutionGraphMessages.{CurrentJobStatus,
JobStatusChanged}
......@@ -70,7 +71,6 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {
val instanceManager = new InstanceManager()
val scheduler = new FlinkScheduler()
val libraryCacheManager = new BlobLibraryCacheManager(new BlobServer(), cleanupInterval)
val webserver = null
val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
val finalJobStatusListener = scala.collection.mutable.HashMap[JobID, Set[ActorRef]]()
......@@ -429,8 +429,7 @@ object JobManager {
}
def startActor(archiveCount: Int, profilingEnabled: Boolean, cleanupInterval: Long)
(implicit actorSystem:
ActorSystem): ActorRef = {
(implicit actorSystem: ActorSystem): ActorRef = {
actorSystem.actorOf(Props(classOf[JobManager], archiveCount, profilingEnabled, cleanupInterval),
JOB_MANAGER_NAME)
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.jobmanager
import akka.actor.Actor
import org.apache.flink.runtime.jobmanager.web.WebInfoServer
trait WithWebServer extends Actor {
that: JobManager =>
val webserver = new WebInfoServer(,,self, archive)
webserver.start()
abstract override def postStop(): Unit = {
that.postStop()
webserver.stop()
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册