From a5ef09bb601cdd77fcb94e9ce633fdf979031aaf Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Mon, 2 Oct 2017 16:30:07 +0200 Subject: [PATCH] [FLINK-7643] [core] Drop eager checks for file system support. Some places validate if the file URIs are resolvable on the client. This leads to problems when file systems are not accessible from the client, when the full libraries for the file systems are not present on the client (for example often the case in cloud setups), or when the configuration on the client is different from the nodes/containers that will execute the application. --- .../org/apache/flink/core/fs/FileSystem.java | 11 ----- .../state/filesystem/FsStateBackend.java | 44 +------------------ .../runtime/jobmanager/MemoryArchivist.scala | 11 ++--- 3 files changed, 8 insertions(+), 58 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index 7c69425ec4f..672ebbbfbf5 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -406,17 +406,6 @@ public abstract class FileSystem { return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri)); } - /** - * Returns a boolean indicating whether a scheme has built-in Flink support. - * - * @param scheme - * a file system scheme - * @return a boolean indicating whether the provided scheme has built-in Flink support - */ - public static boolean isFlinkSupportedScheme(String scheme) { - return FSDIRECTORY.containsKey(scheme); - } - //Class must implement Hadoop FileSystem interface. The class is not avaiable in 'flink-core'. private static FileSystem instantiateHadoopFileSystemWrapper(Class wrappedFileSystem) throws IOException { try { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index e320bf3140d..ddfa85c97ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -31,12 +31,9 @@ import org.apache.flink.runtime.state.DefaultOperatorStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; -import java.net.URISyntaxException; import static org.apache.flink.util.Preconditions.checkArgument; @@ -53,8 +50,6 @@ public class FsStateBackend extends AbstractStateBackend { private static final long serialVersionUID = -8191916350224044011L; - private static final Logger LOG = LoggerFactory.getLogger(FsStateBackend.class); - /** By default, state smaller than 1024 bytes will not be written to files, but * will be stored directly with the metadata */ public static final int DEFAULT_FILE_STATE_THRESHOLD = 1024; @@ -341,7 +336,7 @@ public class FsStateBackend extends AbstractStateBackend { * @throws IllegalArgumentException Thrown, if the URI misses scheme or path. * @throws IOException Thrown, if no file system can be found for the URI's scheme. */ - public static Path validateAndNormalizeUri(URI checkpointDataUri) throws IOException { + private static Path validateAndNormalizeUri(URI checkpointDataUri) throws IOException { final String scheme = checkpointDataUri.getScheme(); final String path = checkpointDataUri.getPath(); @@ -358,41 +353,6 @@ public class FsStateBackend extends AbstractStateBackend { throw new IllegalArgumentException("Cannot use the root directory for checkpoints."); } - if (!FileSystem.isFlinkSupportedScheme(checkpointDataUri.getScheme())) { - // skip verification checks for non-flink supported filesystem - // this is because the required filesystem classes may not be available to the flink client - return new Path(checkpointDataUri); - } else { - // we do a bit of work to make sure that the URI for the filesystem refers to exactly the same - // (distributed) filesystem on all hosts and includes full host/port information, even if the - // original URI did not include that. We count on the filesystem loading from the configuration - // to fill in the missing data. - - // try to grab the file system for this path/URI - FileSystem filesystem = FileSystem.get(checkpointDataUri); - if (filesystem == null) { - String reason = "Could not find a file system for the given scheme in" + - "the available configurations."; - LOG.warn("Could not verify checkpoint path. This might be caused by a genuine " + - "problem or by the fact that the file system is not accessible from the " + - "client. Reason:{}", reason); - return new Path(checkpointDataUri); - } - - URI fsURI = filesystem.getUri(); - try { - URI baseURI = new URI(fsURI.getScheme(), fsURI.getAuthority(), path, null, null); - return new Path(baseURI); - } catch (URISyntaxException e) { - String reason = String.format( - "Cannot create file system URI for checkpointDataUri %s and filesystem URI %s: " + e.toString(), - checkpointDataUri, - fsURI); - LOG.warn("Could not verify checkpoint path. This might be caused by a genuine " + - "problem or by the fact that the file system is not accessible from the " + - "client. Reason: {}", reason); - return new Path(checkpointDataUri); - } - } + return new Path(checkpointDataUri); } } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala index c963238cfee..7a10d01da0e 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala @@ -291,11 +291,12 @@ class MemoryArchivist( throw new IllegalArgumentException("Cannot use the root directory for storing job archives.") } - if (!FileSystem.isFlinkSupportedScheme(archivePathUri.getScheme)) { - // skip verification checks for non-flink supported filesystem - // this is because the required filesystem classes may not be available to the flink client - throw new IllegalArgumentException("No file system found with scheme " + scheme - + ", referenced in file URI '" + archivePathUri.toString + "'.") + try { + FileSystem.get(archivePathUri) + } + catch { + case e: Exception => + throw new IllegalArgumentException(s"No file system found for URI '${archivePathUri}'.") } new Path(archivePathUri) } -- GitLab