diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala index d83f2cd90774affec8c1b75a506d402fc0baae37..327e2a37f0b564f8170a66c96e8ef016e83bcd4e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala @@ -18,13 +18,14 @@ package org.apache.flink.runtime.jobmanager +import java.io.IOException +import java.net.URI import java.util import akka.actor.ActorRef import grizzled.slf4j.Logger import org.apache.flink.api.common.JobID -import org.apache.flink.configuration.Configuration -import org.apache.flink.core.fs.Path +import org.apache.flink.core.fs.{FileSystem, Path} import org.apache.flink.runtime.jobgraph.JobStatus import org.apache.flink.runtime.messages.accumulators._ import org.apache.flink.runtime.webmonitor.WebMonitorUtils @@ -34,7 +35,6 @@ import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, Executio import org.apache.flink.runtime.history.FsJobArchivist import org.apache.flink.runtime.messages.ArchiveMessages._ import org.apache.flink.runtime.messages.JobManagerMessages._ -import org.apache.flink.runtime.state.filesystem.FsStateBackend import scala.collection.mutable import scala.concurrent.future @@ -86,7 +86,7 @@ class MemoryArchivist( } override def handleMessage: Receive = { - + /* Receive Execution Graph to archive */ case ArchiveExecutionGraph(jobID, graph) => // Keep lru order in case we override a graph (from multiple job submission in one session). @@ -109,7 +109,7 @@ class MemoryArchivist( trimHistory() case msg : InfoMessage => handleWebServerInfoMessage(msg, sender()) - + case RequestArchivedJob(jobID: JobID) => val graph = graphs.get(jobID) sender ! decorateMessage(ArchivedJob(graph)) @@ -165,7 +165,7 @@ class MemoryArchivist( throw new RuntimeException("Received unknown message " + message) } - + private def handleWebServerInfoMessage(message: InfoMessage, theSender: ActorRef): Unit = { message match { case _ : RequestJobsOverview => @@ -175,7 +175,7 @@ class MemoryArchivist( catch { case t: Throwable => log.error("Exception while creating the jobs overview", t) } - + case _ : RequestJobsWithIDsOverview => try { sender ! decorateMessage(createJobsWithIDsOverview()) @@ -188,7 +188,7 @@ class MemoryArchivist( val details = graphs.values.map { v => WebMonitorUtils.createDetailsForJob(v) }.toArray[JobDetails] - + theSender ! decorateMessage(new MultipleJobsDetails(null, details)) } } @@ -198,7 +198,7 @@ class MemoryArchivist( // so we aren't archiving it yet. if (archivePath.isDefined && graph.getState.isGloballyTerminalState) { try { - val p = FsStateBackend.validateAndNormalizeUri(archivePath.get.toUri) + val p = validateAndNormalizeUri(archivePath.get.toUri) future { try { FsJobArchivist.archiveJob(p, graph) @@ -217,7 +217,7 @@ class MemoryArchivist( // -------------------------------------------------------------------------- // Request Responses // -------------------------------------------------------------------------- - + private def createJobsOverview() : JobsOverview = { new JobsOverview(0, finishedCnt, canceledCnt, failedCnt) } @@ -239,7 +239,7 @@ class MemoryArchivist( new JobsWithIDsOverview(runningOrPending, finished, canceled, failed) } - + // -------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------- @@ -255,4 +255,48 @@ class MemoryArchivist( graphs.remove(jobID) } } + + /** + * Checks and normalizes the archive path URI. This method first checks the validity of the + * URI (scheme, path, availability of a matching file system) and then normalizes the URL + * to a path. + * + * If the URI does not include an authority, but the file system configured for the URI has an + * authority, then the normalized path will include this authority. + * + * @param archivePathUri The URI to check and normalize. + * @return a normalized URI as a Path. + * + * @throws IllegalArgumentException Thrown, if the URI misses schema or path. + * @throws IOException Thrown, if no file system can be found for the URI's scheme. + */ + @throws[IOException] + private def validateAndNormalizeUri(archivePathUri: URI): Path = { + val scheme = archivePathUri.getScheme + val path = archivePathUri.getPath + + // some validity checks + if (scheme == null) { + throw new IllegalArgumentException("The scheme (hdfs://, file://, etc) is null. " + + "Please specify the file system scheme explicitly in the URI: " + archivePathUri) + } + + if (path == null) { + throw new IllegalArgumentException("The path to store the job archives is null. " + + "Please specify a directory path for storing job archives. and the URI is: " + + archivePathUri) + } + + if (path.length == 0 || path == "/") { + throw new IllegalArgumentException("Cannot use the root directory for storing job archives.") + } + + if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) { + // skip verification checks for non-flink supported filesystem + // this is because the required filesystem classes may not be available to the flink client + throw new IllegalArgumentException("No file system found with scheme " + scheme + + ", referenced in file URI '" + archivePathUri.toString + "'.") + } + new Path(archivePathUri) + } }