From 76d3a6353a0a3f8960dc2b49d9eac6a4a279ea9e Mon Sep 17 00:00:00 2001 From: Kostas Kloudas Date: Tue, 19 Jan 2016 16:42:33 +0100 Subject: [PATCH] FLINK-2380: allow the specification of a default filesystem scheme in the flink configuration file. This closes #1524 --- docs/setup/config.md | 12 + .../org/apache/flink/client/CliFrontend.java | 8 + .../flink/client/FlinkYarnSessionCli.java | 3 +- .../flink/configuration/ConfigConstants.java | 13 ++ .../org/apache/flink/core/fs/FileSystem.java | 213 +++++++++++------- .../FilesystemSchemeConfigTest.java | 136 +++++++++++ .../GlobalConfigurationTest.java | 1 - .../flink/runtime/jobmanager/JobManager.scala | 13 +- .../runtime/taskmanager/TaskManager.scala | 15 +- .../TaskManagerConfigurationTest.java | 50 +++- .../flink/yarn/FlinkYarnClientBase.java | 7 + .../flink/yarn/ApplicationMasterBase.scala | 12 +- 12 files changed, 392 insertions(+), 91 deletions(-) create mode 100644 flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java diff --git a/docs/setup/config.md b/docs/setup/config.md index 59cccc313a9..79a9527788b 100644 --- a/docs/setup/config.md +++ b/docs/setup/config.md @@ -54,6 +54,13 @@ The configuration files for the TaskManagers can be different, Flink does not as - `parallelism.default`: The default parallelism to use for programs that have no parallelism specified. (DEFAULT: 1). For setups that have no concurrent jobs running, setting this value to NumTaskManagers * NumSlotsPerTaskManager will cause the system to use all available execution resources for the program's execution. **Note**: The default parallelism can be overwriten for an entire job by calling `setParallelism(int parallelism)` on the `ExecutionEnvironment` or by passing `-p ` to the Flink Command-line frontend. It can be overwritten for single transformations by calling `setParallelism(int parallelism)` on an operator. See the [programming guide]({{site.baseurl}}/apis/programming_guide.html#parallel-execution) for more information about the parallelism. +- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority to contact, e.g. the host:port of the NameNode in the case of HDFS (if needed). +By default, this is set to `file:///` which points to the local filesystem. This means that the local +filesystem is going to be used to search for user-specified files **without** an explicit scheme +definition. As another example, if this is set to `hdfs://localhost:9000/`, then a user-specified file path +without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to be transformed into +`hdfs://localhost:9000/user/USERNAME/in.txt`. This scheme is used **ONLY** if no other scheme is specified (explicitly) in the user-provided `URI`. + - `fs.hdfs.hadoopconf`: The absolute path to the Hadoop File System's (HDFS) configuration **directory** (OPTIONAL VALUE). Specifying this value allows programs to reference HDFS files using short URIs (`hdfs:///path/to/files`, without including the address and port of the NameNode in the file URI). Without this option, HDFS files can be accessed, but require fully qualified URIs like `hdfs://address:port/path/to/files`. This option also causes file writers to pick up the HDFS's default values for block sizes and replication factors. Flink will look for the "core-site.xml" and "hdfs-site.xml" files in teh specified directory. ## Advanced Options @@ -194,6 +201,11 @@ The following parameters configure Flink's JobManager and TaskManagers. The parameters define the behavior of tasks that create result files. +- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority to contact, e.g. the host:port of the NameNode in the case of HDFS (if needed). +By default, this is set to `file:///` which points to the local filesystem. This means that the local +filesystem is going to be used to search for user-specified files **without** an explicit scheme +definition. This scheme is used **ONLY** if no other scheme is specified (explicitly) in the user-provided `URI`. + - `fs.overwrite-files`: Specifies whether file output writers should overwrite existing files by default. Set to *true* to overwrite by default, *false* otherwise. (DEFAULT: false) - `fs.output.always-create-directory`: File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to *true*, writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to *false*, the writer will directly create the file directly at the output path, without creating a containing directory. (DEFAULT: false) diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 98bf056c71e..3f9014d1295 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -43,6 +43,7 @@ import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; import org.apache.flink.optimizer.costs.DefaultCostEstimator; @@ -231,6 +232,13 @@ public class CliFrontend { } } + try { + FileSystem.setDefaultScheme(config); + } catch (IOException e) { + throw new Exception("Error while setting the default " + + "filesystem scheme from configuration.", e); + } + this.clientTimeout = AkkaUtils.getClientTimeout(config); this.lookupTimeout = AkkaUtils.getLookupTimeout(config); } diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java index 4f540a65abb..14dc2893fde 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java +++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java @@ -393,7 +393,7 @@ public class FlinkYarnSessionCli { printUsage(); return 1; } - + // Query cluster for metrics if (cmd.hasOption(QUERY.getOpt())) { AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient(); @@ -415,7 +415,6 @@ public class FlinkYarnSessionCli { return 1; } - try { yarnCluster = flinkYarnClient.deploy(); // only connect to cluster if its not a detached session. diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index b2bbda12c16..ccf90b5ac0c 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -335,6 +335,13 @@ public final class ConfigConstants { // ------------------------ File System Behavior ------------------------ + /** + * Key to specify the default filesystem to be used by a job. In the case of + * file:///, which is the default (see {@link ConfigConstants#DEFAULT_FILESYSTEM_SCHEME}), + * the local filesystem is going to be used to resolve URIs without an explicit scheme. + * */ + public static final String FILESYSTEM_SCHEME = "fs.default-scheme"; + /** * Key to specify whether the file systems should simply overwrite existing files. */ @@ -693,6 +700,12 @@ public final class ConfigConstants { // ------------------------ File System Behavior ------------------------ + /** + * The default filesystem to be used, if no other scheme is specified in the + * user-provided URI (= local filesystem) + * */ + public static final String DEFAULT_FILESYSTEM_SCHEME = "file:///"; + /** * The default behavior with respect to overwriting existing files (= not overwrite) */ diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index 79ddd8a5d80..3e36b883aac 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -20,7 +20,7 @@ /** * This file is based on source code from the Hadoop Project (http://hadoop.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for - * additional information regarding copyright ownership. + * additional information regarding copyright ownership. */ package org.apache.flink.core.fs; @@ -35,6 +35,8 @@ import java.util.HashMap; import java.util.Map; import org.apache.flink.annotation.Public; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; import org.apache.flink.util.OperatingSystem; /** @@ -46,30 +48,30 @@ import org.apache.flink.util.OperatingSystem; public abstract class FileSystem { private static final String LOCAL_FILESYSTEM_CLASS = "org.apache.flink.core.fs.local.LocalFileSystem"; - + private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem"; private static final String MAPR_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.maprfs.MapRFileSystem"; - + private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper"; - + /** Object used to protect calls to specific methods.*/ private static final Object SYNCHRONIZATION_OBJECT = new Object(); /** - * Enumeration for write modes. + * Enumeration for write modes. * */ public static enum WriteMode { - + /** Creates write path if it does not exist. Does not overwrite existing files and directories. */ NO_OVERWRITE, - + /** Creates write path if it does not exist. Overwrites existing files and directories. */ - OVERWRITE + OVERWRITE } - + /** * An auxiliary class to identify a file system by its scheme and its authority. */ @@ -88,7 +90,7 @@ public abstract class FileSystem { /** * Creates a file system key from a given scheme and an * authority. - * + * * @param scheme * the scheme of the file system * @param authority @@ -159,7 +161,7 @@ public abstract class FileSystem { /** * Returns a reference to the {@link FileSystem} instance for accessing the * local file system. - * + * * @return a reference to the {@link FileSystem} instance for accessing the * local file system. */ @@ -174,10 +176,48 @@ public abstract class FileSystem { } } + /** + * The default filesystem scheme to be used. This can be specified by the parameter + * fs.default-scheme in flink-conf.yaml. By default this is + * set to file:/// (see {@link ConfigConstants#FILESYSTEM_SCHEME} + * and {@link ConfigConstants#DEFAULT_FILESYSTEM_SCHEME}), and uses the local filesystem. + */ + private static URI defaultScheme; + + /** + *

+ * Sets the default filesystem scheme based on the user-specified configuration parameter + * fs.default-scheme. By default this is set to file:/// + * (see {@link ConfigConstants#FILESYSTEM_SCHEME} and + * {@link ConfigConstants#DEFAULT_FILESYSTEM_SCHEME}), + * and the local filesystem is used. + *

+ * As an example, if set to hdfs://localhost:9000/, then an HDFS deployment + * with the namenode being on the local node and listening to port 9000 is going to be used. + * In this case, a file path specified as /user/USERNAME/in.txt + * is going to be transformed into hdfs://localhost:9000/user/USERNAME/in.txt. By + * default this is set to file:/// which points to the local filesystem. + * @param config the configuration from where to fetch the parameter. + */ + public static void setDefaultScheme(Configuration config) throws IOException { + synchronized (SYNCHRONIZATION_OBJECT) { + if (defaultScheme == null) { + String stringifiedUri = config.getString(ConfigConstants.FILESYSTEM_SCHEME, + ConfigConstants.DEFAULT_FILESYSTEM_SCHEME); + try { + defaultScheme = new URI(stringifiedUri); + } catch (URISyntaxException e) { + throw new IOException("The URI used to set the default filesystem " + + "scheme ('" + stringifiedUri + "') is not valid."); + } + } + } + } + /** * Returns a reference to the {@link FileSystem} instance for accessing the * file system identified by the given {@link URI}. - * + * * @param uri * the {@link URI} identifying the file system * @return a reference to the {@link FileSystem} instance for accessing the file system identified by the given @@ -188,25 +228,40 @@ public abstract class FileSystem { public static FileSystem get(URI uri) throws IOException { FileSystem fs; + URI asked = uri; synchronized (SYNCHRONIZATION_OBJECT) { if (uri.getScheme() == null) { try { - uri = new URI("file", null, uri.getPath(), null); - } - catch (URISyntaxException e) { + if (defaultScheme == null) { + defaultScheme = new URI(ConfigConstants.DEFAULT_FILESYSTEM_SCHEME); + } + + uri = new URI(defaultScheme.getScheme(), null, defaultScheme.getHost(), + defaultScheme.getPort(), uri.getPath(), null, null); + + } catch (URISyntaxException e) { try { - uri = new URI("file", null, new Path(new File(uri.getPath()).getAbsolutePath()).toUri().getPath(), null); + if (defaultScheme.getScheme().equals("file")) { + uri = new URI("file", null, + new Path(new File(uri.getPath()).getAbsolutePath()).toUri().getPath(), null); + } } catch (URISyntaxException ex) { // we tried to repair it, but could not. report the scheme error - throw new IOException("The file URI '" + uri.toString() + "' is not valid."); + throw new IOException("The URI '" + uri.toString() + "' is not valid."); } } } - + + if(uri.getScheme() == null) { + throw new IOException("The URI '" + uri + "' is invalid.\n" + + "The fs.default-scheme = " + defaultScheme + ", the requested URI = " + asked + + ", and the final URI = " + uri + "."); + } + if (uri.getScheme().equals("file") && uri.getAuthority() != null && !uri.getAuthority().isEmpty()) { String supposedUri = "file:///" + uri.getAuthority() + uri.getPath(); - + throw new IOException("Found local file path with authority '" + uri.getAuthority() + "' in path '" + uri.toString() + "'. Hint: Did you forget a slash? (correct path would be '" + supposedUri + "')"); } @@ -239,7 +294,7 @@ public abstract class FileSystem { } else { // we can not read from this file system. throw new IOException("No file system found with scheme " + uri.getScheme() - + ", referenced in file URI '" + uri.toString() + "'."); + + ", referenced in file URI '" + uri.toString() + "'."); } } else { // we end up here if we have a file system with build-in flink support. @@ -299,7 +354,7 @@ public abstract class FileSystem { /** * Returns the path of the file system's current working directory. - * + * * @return the path of the file system's current working directory */ public abstract Path getWorkingDirectory(); @@ -313,14 +368,14 @@ public abstract class FileSystem { /** * Returns a URI whose scheme and authority identify this file system. - * + * * @return a URI whose scheme and authority identify this file system */ public abstract URI getUri(); /** * Called after a new FileSystem instance is constructed. - * + * * @param name * a {@link URI} whose authority section names the host, port, etc. for this file system */ @@ -328,7 +383,7 @@ public abstract class FileSystem { /** * Return a file status object that represents the path. - * + * * @param f * The path we want information from * @return a FileStatus object @@ -350,7 +405,7 @@ public abstract class FileSystem { /** * Opens an FSDataInputStream at the indicated Path. - * + * * @param f * the file name to open * @param bufferSize @@ -360,7 +415,7 @@ public abstract class FileSystem { /** * Opens an FSDataInputStream at the indicated Path. - * + * * @param f * the file to open */ @@ -368,7 +423,7 @@ public abstract class FileSystem { /** * Return the number of bytes that large input files should be optimally be split into to minimize I/O time. - * + * * @return the number of bytes that large input files should be optimally be split into to minimize I/O time */ public long getDefaultBlockSize() { @@ -378,7 +433,7 @@ public abstract class FileSystem { /** * List the statuses of the files/directories in the given path if the path is * a directory. - * + * * @param f * given path * @return the statuses of the files/directories in the given patch @@ -388,7 +443,7 @@ public abstract class FileSystem { /** * Check if exists. - * + * * @param f * source file */ @@ -403,7 +458,7 @@ public abstract class FileSystem { /** * Delete a file. - * + * * @param f * the path to delete * @param recursive @@ -417,7 +472,7 @@ public abstract class FileSystem { /** * Make the given file and all non-existent parents into directories. Has the semantics of Unix 'mkdir -p'. * Existence of the directory hierarchy is not an error. - * + * * @param f * the directory/directories to be created * @return true if at least one new directory has been created, false otherwise @@ -428,7 +483,7 @@ public abstract class FileSystem { /** * Opens an FSDataOutputStream at the indicated Path. - * + * * @param f * the file name to open * @param overwrite @@ -447,7 +502,7 @@ public abstract class FileSystem { /** * Opens an FSDataOutputStream at the indicated Path. - * + * * @param f * the file name to open * @param overwrite @@ -459,7 +514,7 @@ public abstract class FileSystem { /** * Renames the file/directory src to dst. - * + * * @param src * the file/directory to rename * @param dst @@ -469,34 +524,34 @@ public abstract class FileSystem { */ public abstract boolean rename(Path src, Path dst) throws IOException; - + /** * Initializes output directories on local file systems according to the given write mode. - * + * * WriteMode.NO_OVERWRITE & parallel output: * - A directory is created if the output path does not exist. * - An existing directory is reused, files contained in the directory are NOT deleted. * - An existing file raises an exception. - * + * * WriteMode.NO_OVERWRITE & NONE parallel output: * - An existing file or directory raises an exception. - * + * * WriteMode.OVERWRITE & parallel output: * - A directory is created if the output path does not exist. * - An existing directory is reused, files contained in the directory are NOT deleted. * - An existing file is deleted and replaced by a new directory. - * + * * WriteMode.OVERWRITE & NONE parallel output: * - An existing file or directory (and all its content) is deleted - * - * Files contained in an existing directory are not deleted, because multiple instances of a - * DataSinkTask might call this function at the same time and hence might perform concurrent - * delete operations on the file system (possibly deleting output files of concurrently running tasks). + * + * Files contained in an existing directory are not deleted, because multiple instances of a + * DataSinkTask might call this function at the same time and hence might perform concurrent + * delete operations on the file system (possibly deleting output files of concurrently running tasks). * Since concurrent DataSinkTasks are not aware of each other, coordination of delete and create * operations would be difficult. - * + * * @param outPath Output path that should be prepared. - * @param writeMode Write mode to consider. + * @param writeMode Write mode to consider. * @param createDirectory True, to initialize a directory at the given path, false otherwise. * @return True, if the path was successfully prepared, false otherwise. * @throws IOException @@ -505,19 +560,19 @@ public abstract class FileSystem { if (this.isDistributedFS()) { return false; } - + // NOTE: we sometimes see this code block fail due to a races when changes to the file system take small time fractions before being // visible to other threads. for example: // - the check whether the directory exists returns false // - the call to create the directory fails (some concurrent thread is creating the directory, locked) // - the call to check whether the directory exists does not yet see the new directory (change is not committed) - + // try for 30 seconds final long now = System.currentTimeMillis(); final long deadline = now + 30000; - + Exception lastError = null; - + do { FileStatus status = null; try { @@ -526,7 +581,7 @@ public abstract class FileSystem { catch (FileNotFoundException e) { // okay, the file is not there } - + // check if path exists if (status != null) { // path exists, check write mode @@ -536,11 +591,11 @@ public abstract class FileSystem { return true; } else { // file may not be overwritten - throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " + - WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() + + throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " + + WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories."); } - + case OVERWRITE: if (status.isDir()) { if (createDirectory) { @@ -567,7 +622,7 @@ public abstract class FileSystem { } catch (IOException e) { // Some other thread might already have deleted the file. - // If - for some other reason - the file could not be deleted, + // If - for some other reason - the file could not be deleted, // the error will be handled later. lastError = e; } @@ -577,10 +632,10 @@ public abstract class FileSystem { throw new IllegalArgumentException("Invalid write mode: " + writeMode); } } - + if (createDirectory) { // Output directory needs to be created - + try { if (!this.exists(outPath)) { this.mkdirs(outPath); @@ -590,7 +645,7 @@ public abstract class FileSystem { // Some other thread might already have created the directory concurrently. lastError = e; } - + // double check that the output directory exists try { FileStatus check = getFileStatus(outPath); @@ -607,13 +662,13 @@ public abstract class FileSystem { catch (FileNotFoundException e) { // fall though the loop } - + } else { // check that the output path does not exist and an output file can be created by the output format. return !this.exists(outPath); } - + // small delay to allow changes to make progress try { Thread.sleep(10); @@ -623,34 +678,34 @@ public abstract class FileSystem { } } while (System.currentTimeMillis() < deadline); - + if (lastError != null) { throw new IOException("File system failed to prepare output path " + outPath + " with write mode " + writeMode.name(), lastError); } else { return false; } } - + /** * Initializes output directories on distributed file systems according to the given write mode. - * + * * WriteMode.NO_OVERWRITE & parallel output: * - A directory is created if the output path does not exist. * - An existing file or directory raises an exception. - * + * * WriteMode.NO_OVERWRITE & NONE parallel output: - * - An existing file or directory raises an exception. - * + * - An existing file or directory raises an exception. + * * WriteMode.OVERWRITE & parallel output: * - A directory is created if the output path does not exist. * - An existing directory and its content is deleted and a new directory is created. * - An existing file is deleted and replaced by a new directory. - * + * * WriteMode.OVERWRITE & NONE parallel output: * - An existing file or directory is deleted and replaced by a new directory. - * + * * @param outPath Output path that should be prepared. - * @param writeMode Write mode to consider. + * @param writeMode Write mode to consider. * @param createDirectory True, to initialize a directory at the given path, false otherwise. * @return True, if the path was successfully prepared, false otherwise. * @throws IOException @@ -659,15 +714,15 @@ public abstract class FileSystem { if (!this.isDistributedFS()) { return false; } - + // check if path exists if (this.exists(outPath)) { // path exists, check write mode switch(writeMode) { case NO_OVERWRITE: // file or directory may not be overwritten - throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " + - WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() + + throw new IOException("File or directory already exists. Existing files and directories are not overwritten in " + + WriteMode.NO_OVERWRITE.name() + " mode. Use " + WriteMode.OVERWRITE.name() + " mode to overwrite existing files and directories."); case OVERWRITE: // output path exists. We delete it and all contained files in case of a directory. @@ -675,7 +730,7 @@ public abstract class FileSystem { this.delete(outPath, true); } catch(IOException ioe) { // Some other thread might already have deleted the path. - // If - for some other reason - the path could not be deleted, + // If - for some other reason - the path could not be deleted, // this will be handled later. } break; @@ -683,7 +738,7 @@ public abstract class FileSystem { throw new IllegalArgumentException("Invalid write mode: "+writeMode); } } - + if (createDirectory) { // Output directory needs to be created try { @@ -695,24 +750,24 @@ public abstract class FileSystem { // If - for some other reason - the directory could not be created // and the path does not exist, this will be handled later. } - + // double check that the output directory exists return this.exists(outPath) && this.getFileStatus(outPath).isDir(); } else { - + // check that the output path does not exist and an output file can be created by the output format. return !this.exists(outPath); } - + } - + /** * Returns true if this is a distributed file system, false otherwise. - * + * * @return True if this is a distributed file system, false otherwise. */ public abstract boolean isDistributedFS(); - + private static Class getFileSystemByName(String className) throws ClassNotFoundException { return Class.forName(className, true, FileSystem.class.getClassLoader()).asSubclass(FileSystem.class); diff --git a/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java b/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java new file mode 100644 index 00000000000..8de4a225f5f --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.CommonTestUtils; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.PrintWriter; +import java.lang.reflect.Field; +import java.net.URI; +import java.net.URISyntaxException; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class FilesystemSchemeConfigTest { + + @Before + public void resetSingleton() throws SecurityException, NoSuchFieldException, IllegalArgumentException, + IllegalAccessException { + // reset GlobalConfiguration between tests + Field instance = GlobalConfiguration.class.getDeclaredField("SINGLETON"); + instance.setAccessible(true); + instance.set(null, null); + } + + @Test + public void testExplicitFilesystemScheme() { + testSettingFilesystemScheme(false, "fs.default-scheme: otherFS://localhost:1234/", true); + } + + @Test + public void testSettingFilesystemSchemeInConfiguration() { + testSettingFilesystemScheme(false, "fs.default-scheme: file:///", false); + } + + @Test + public void testUsingDefaultFilesystemScheme() { + testSettingFilesystemScheme(true, "fs.default-scheme: file:///", false); + } + + private void testSettingFilesystemScheme(boolean useDefaultScheme, + String configFileScheme, boolean useExplicitScheme) { + final File tmpDir = getTmpDir(); + final File confFile = createRandomFile(tmpDir, ".yaml"); + final File testFile = new File(tmpDir.getAbsolutePath() + File.separator + "testing.txt"); + + try { + try { + final PrintWriter pw1 = new PrintWriter(confFile); + if(!useDefaultScheme) { + pw1.println(configFileScheme); + } + pw1.close(); + + final PrintWriter pwTest = new PrintWriter(testFile); + pwTest.close(); + + } catch (FileNotFoundException e) { + fail(e.getMessage()); + } + + GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath()); + Configuration conf = GlobalConfiguration.getConfiguration(); + + try { + FileSystem.setDefaultScheme(conf); + String noSchemePath = testFile.toURI().getPath(); // remove the scheme. + + URI uri = new URI(noSchemePath); + // check if the scheme == null (so that we get the configuration one. + assertTrue(uri.getScheme() == null); + + // get the filesystem with the default scheme as set in the confFile1 + FileSystem fs = useExplicitScheme ? FileSystem.get(testFile.toURI()) : FileSystem.get(uri); + assertTrue(fs.exists(new Path(noSchemePath))); + + } catch (IOException e) { + fail(e.getMessage()); + } catch (URISyntaxException e) { + e.printStackTrace(); + } + } finally { + try { + // clear the default scheme set in the FileSystem class. + // we do it through reflection to avoid creating a publicly + // accessible method, which could also be wrongly used by users. + + Field f = FileSystem.class.getDeclaredField("defaultScheme"); + f.setAccessible(true); + f.set(null, null); + } catch (IllegalAccessException e) { + e.printStackTrace(); + } catch (NoSuchFieldException e) { + e.printStackTrace(); + } + confFile.delete(); + testFile.delete(); + tmpDir.delete(); + } + } + + private File getTmpDir() { + File tmpDir = new File(CommonTestUtils.getTempDir() + File.separator + + CommonTestUtils.getRandomDirectoryName() + File.separator); + tmpDir.mkdirs(); + + return tmpDir; + } + + private File createRandomFile(File path, String suffix) { + return new File(path.getAbsolutePath() + File.separator + CommonTestUtils.getRandomDirectoryName() + suffix); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java index 41e97b206cb..5e1a4d0e96f 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java @@ -221,5 +221,4 @@ public class GlobalConfigurationTest extends TestLogger { private File createRandomFile(File path, String suffix) { return new File(path.getAbsolutePath() + File.separator + CommonTestUtils.getRandomDirectoryName() + suffix); } - } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 7430115d0e2..1a162250220 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -31,6 +31,7 @@ import grizzled.slf4j.Logger import org.apache.flink.api.common.{ExecutionConfig, JobID} import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} +import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.io.InputSplitAssigner import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} @@ -92,7 +93,7 @@ import scala.language.postfixOps * is indicated by [[CancellationSuccess]] and a failure by [[CancellationFailure]] * * - [[UpdateTaskExecutionState]] is sent by a TaskManager to update the state of an - * ExecutionVertex contained in the [[ExecutionGraph]]. + * ExecutionVertex contained in the [[ExecutionGraph]]. * A successful update is acknowledged by true and otherwise false. * * - [[RequestNextInputSplit]] requests the next input split for a running task on a @@ -1992,6 +1993,16 @@ object JobManager { GlobalConfiguration.loadConfiguration(configDir) val configuration = GlobalConfiguration.getConfiguration() + try { + FileSystem.setDefaultScheme(configuration) + } + catch { + case e: IOException => { + throw new Exception("Error while setting the default " + + "filesystem scheme from configuration.", e) + } + } + if (new File(configDir).isDirectory) { configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..") } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 2f46c8369c4..16b70629dd1 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -35,7 +35,8 @@ import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry} import com.fasterxml.jackson.databind.ObjectMapper import grizzled.slf4j.Logger import org.apache.flink.configuration._ -import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType} +import org.apache.flink.core.fs.FileSystem +import org.apache.flink.core.memory.{HybridMemorySegment, HeapMemorySegment, MemorySegmentFactory, MemoryType} import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.blob.{BlobCache, BlobService} @@ -1422,7 +1423,17 @@ object TaskManager { catch { case e: Exception => throw new Exception("Could not load configuration", e) } - + + try { + FileSystem.setDefaultScheme(conf) + } + catch { + case e: IOException => { + throw new Exception("Error while setting the default " + + "filesystem scheme from configuration.", e) + } + } + conf } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java index 430e66959dd..f3677998d01 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerConfigurationTest.java @@ -21,15 +21,18 @@ package org.apache.flink.runtime.taskmanager; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.testutils.CommonTestUtils; import org.junit.Test; import scala.Tuple2; +import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.UnknownHostException; +import java.io.PrintWriter; +import java.lang.reflect.Field; +import java.net.*; import static org.junit.Assert.*; @@ -103,6 +106,38 @@ public class TaskManagerConfigurationTest { } } + @Test + public void testDefaultFsParameterLoading() { + final File tmpDir = getTmpDir(); + final File confFile = new File(tmpDir.getAbsolutePath() + File.separator + CommonTestUtils.getRandomDirectoryName() + ".yaml"); + + try { + final URI defaultFS = new URI("otherFS", null, "localhost", 1234, null, null, null); + + final PrintWriter pw1 = new PrintWriter(confFile); + pw1.println("fs.default-scheme: "+ defaultFS); + pw1.close(); + + String filepath = confFile.getAbsolutePath(); + + String[] args = new String[]{"--configDir:"+filepath}; + TaskManager.parseArgsAndLoadConfig(args); + + Field f = FileSystem.class.getDeclaredField("defaultScheme"); + f.setAccessible(true); + URI scheme = (URI) f.get(null); + + assertEquals("Default Filesystem Scheme not configured.", scheme, defaultFS); + } catch (FileNotFoundException e) { + fail(e.getMessage()); + } catch (Exception e) { + e.printStackTrace(); + } finally { + confFile.delete(); + tmpDir.delete(); + } + } + @Test public void testNetworkInterfaceSelection() { ServerSocket server; @@ -145,4 +180,11 @@ public class TaskManagerConfigurationTest { } } + private File getTmpDir() { + File tmpDir = new File(CommonTestUtils.getTempDir() + File.separator + + CommonTestUtils.getRandomDirectoryName() + File.separator); + tmpDir.mkdirs(); + + return tmpDir; + } } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java index de7c93379dc..afedb334b64 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java @@ -364,6 +364,13 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient { flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue()); } + try { + org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration); + } catch (IOException e) { + LOG.error("Error while setting the default " + + "filesystem scheme from configuration.", e); + return null; + } // ------------------ Check if the specified queue exists -------------- try { diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala index 7579d7d0d39..c1b35cb8aeb 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterBase.scala @@ -18,13 +18,14 @@ package org.apache.flink.yarn -import java.io.{BufferedWriter, FileWriter, PrintWriter} +import java.io.{IOException, FileWriter, BufferedWriter, PrintWriter} import java.net.{BindException, ServerSocket} import java.security.PrivilegedAction import akka.actor.{ActorRef, ActorSystem} import org.apache.flink.client.CliFrontend -import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration} +import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants} +import org.apache.flink.core.fs.FileSystem import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.jobmanager.{JobManager, JobManagerMode, MemoryArchivist} import org.apache.flink.runtime.util.EnvironmentInformation @@ -113,6 +114,13 @@ abstract class ApplicationMasterBase { config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0) } + try { + FileSystem.setDefaultScheme(config) + } catch { + case t: IOException => + log.error("Error while setting the default filesystem scheme from configuration.", t) + } + // we try to start the JobManager actor system using the port definition // from the config. // first, we check if the port is available by opening a socket -- GitLab