From c024d819cfd37e05a8bac9db8082c1ef03d3f3bb Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Thu, 8 Jan 2015 12:28:06 +0100 Subject: [PATCH] [FLINK-1266] Properly pass the fs.defaulFS setting when initializing filesystems --- .../runtime/fs/hdfs/HadoopFileSystem.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java index 32e3f2df896..37061bfb36d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java @@ -263,8 +263,8 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst @Override public void initialize(URI path) throws IOException { - // For HDFS we have to have an authority - if (path.getAuthority() == null && path.getScheme().equals("hdfs")) { + // If the authority is not part of the path, we initialize with the fs.defaultFS entry. + if (path.getAuthority() == null) { String configEntry = this.conf.get("fs.defaultFS", null); if (configEntry == null) { @@ -277,31 +277,29 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst } if (configEntry == null) { - throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs configuration was registered, " + - "or that configuration did not contain an entry for the default hdfs."); + throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default file system (hdfs) configuration was registered, " + + "or that configuration did not contain an entry for the default file system (usually 'fs.defaultFS')."); } else { try { URI initURI = URI.create(configEntry); if (initURI.getAuthority() == null) { - throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs configuration was registered, " + - "or the provided configuration contains no valid hdfs namenode address (fs.default.name or fs.defaultFS) describing the hdfs namenode host and port."); - } else if (!initURI.getScheme().equalsIgnoreCase("hdfs")) { - throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default hdfs configuration was registered, " + - "or the provided configuration describes a file system with scheme '" + initURI.getScheme() + "' other than the Hadoop Distributed File System (HDFS)."); + throw new IOException(getMissingAuthorityErrorPrefix(path) + "Either no default file system was registered, " + + "or the provided configuration contains no valid authority component (fs.default.name or fs.defaultFS) " + + "describing the (hdfs namenode) host and port."); } else { try { this.fs.initialize(initURI, this.conf); } catch (IOException e) { throw new IOException(getMissingAuthorityErrorPrefix(path) + - "Could not initialize the file system connection with the given address of the HDFS NameNode: " + e.getMessage(), e); + "Could not initialize the file system connection with the given default file system address: " + e.getMessage(), e); } } } catch (IllegalArgumentException e) { throw new IOException(getMissingAuthorityErrorPrefix(path) + - "The configuration contains an invalid hdfs default name (fs.default.name or fs.defaultFS): " + configEntry); + "The configuration contains an invalid file system default name (fs.default.name or fs.defaultFS): " + configEntry); } } } -- GitLab