提交 76d3a635 编写于 作者: K Kostas Kloudas 提交者: Robert Metzger

FLINK-2380: allow the specification of a default filesystem scheme in the flink configuration file.

This closes #1524
上级 c658763d
......@@ -54,6 +54,13 @@ The configuration files for the TaskManagers can be different, Flink does not as
- `parallelism.default`: The default parallelism to use for programs that have no parallelism specified. (DEFAULT: 1). For setups that have no concurrent jobs running, setting this value to NumTaskManagers * NumSlotsPerTaskManager will cause the system to use all available execution resources for the program's execution. **Note**: The default parallelism can be overwriten for an entire job by calling `setParallelism(int parallelism)` on the `ExecutionEnvironment` or by passing `-p <parallelism>` to the Flink Command-line frontend. It can be overwritten for single transformations by calling `setParallelism(int
parallelism)` on an operator. See the [programming guide]({{site.baseurl}}/apis/programming_guide.html#parallel-execution) for more information about the parallelism.
- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority to contact, e.g. the host:port of the NameNode in the case of HDFS (if needed).
By default, this is set to `file:///` which points to the local filesystem. This means that the local
filesystem is going to be used to search for user-specified files **without** an explicit scheme
definition. As another example, if this is set to `hdfs://localhost:9000/`, then a user-specified file path
without explicit scheme definition, such as `/user/USERNAME/in.txt`, is going to be transformed into
`hdfs://localhost:9000/user/USERNAME/in.txt`. This scheme is used **ONLY** if no other scheme is specified (explicitly) in the user-provided `URI`.
- `fs.hdfs.hadoopconf`: The absolute path to the Hadoop File System's (HDFS) configuration **directory** (OPTIONAL VALUE). Specifying this value allows programs to reference HDFS files using short URIs (`hdfs:///path/to/files`, without including the address and port of the NameNode in the file URI). Without this option, HDFS files can be accessed, but require fully qualified URIs like `hdfs://address:port/path/to/files`. This option also causes file writers to pick up the HDFS's default values for block sizes and replication factors. Flink will look for the "core-site.xml" and "hdfs-site.xml" files in teh specified directory.
## Advanced Options
......@@ -194,6 +201,11 @@ The following parameters configure Flink's JobManager and TaskManagers.
The parameters define the behavior of tasks that create result files.
- `fs.default-scheme`: The default filesystem scheme to be used, with the necessary authority to contact, e.g. the host:port of the NameNode in the case of HDFS (if needed).
By default, this is set to `file:///` which points to the local filesystem. This means that the local
filesystem is going to be used to search for user-specified files **without** an explicit scheme
definition. This scheme is used **ONLY** if no other scheme is specified (explicitly) in the user-provided `URI`.
- `fs.overwrite-files`: Specifies whether file output writers should overwrite existing files by default. Set to *true* to overwrite by default, *false* otherwise. (DEFAULT: false)
- `fs.output.always-create-directory`: File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to *true*, writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to *false*, the writer will directly create the file directly at the output path, without creating a containing directory. (DEFAULT: false)
......
......@@ -43,6 +43,7 @@ import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
......@@ -231,6 +232,13 @@ public class CliFrontend {
}
}
try {
FileSystem.setDefaultScheme(config);
} catch (IOException e) {
throw new Exception("Error while setting the default " +
"filesystem scheme from configuration.", e);
}
this.clientTimeout = AkkaUtils.getClientTimeout(config);
this.lookupTimeout = AkkaUtils.getLookupTimeout(config);
}
......
......@@ -415,7 +415,6 @@ public class FlinkYarnSessionCli {
return 1;
}
try {
yarnCluster = flinkYarnClient.deploy();
// only connect to cluster if its not a detached session.
......
......@@ -335,6 +335,13 @@ public final class ConfigConstants {
// ------------------------ File System Behavior ------------------------
/**
* Key to specify the default filesystem to be used by a job. In the case of
* <code>file:///</code>, which is the default (see {@link ConfigConstants#DEFAULT_FILESYSTEM_SCHEME}),
* the local filesystem is going to be used to resolve URIs without an explicit scheme.
* */
public static final String FILESYSTEM_SCHEME = "fs.default-scheme";
/**
* Key to specify whether the file systems should simply overwrite existing files.
*/
......@@ -693,6 +700,12 @@ public final class ConfigConstants {
// ------------------------ File System Behavior ------------------------
/**
* The default filesystem to be used, if no other scheme is specified in the
* user-provided URI (= local filesystem)
* */
public static final String DEFAULT_FILESYSTEM_SCHEME = "file:///";
/**
* The default behavior with respect to overwriting existing files (= not overwrite)
*/
......
......@@ -35,6 +35,8 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.flink.annotation.Public;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.OperatingSystem;
/**
......@@ -174,6 +176,44 @@ public abstract class FileSystem {
}
}
/**
* The default filesystem scheme to be used. This can be specified by the parameter
* <code>fs.default-scheme</code> in <code>flink-conf.yaml</code>. By default this is
* set to <code>file:///</code> (see {@link ConfigConstants#FILESYSTEM_SCHEME}
* and {@link ConfigConstants#DEFAULT_FILESYSTEM_SCHEME}), and uses the local filesystem.
*/
private static URI defaultScheme;
/**
* <p>
* Sets the default filesystem scheme based on the user-specified configuration parameter
* <code>fs.default-scheme</code>. By default this is set to <code>file:///</code>
* (see {@link ConfigConstants#FILESYSTEM_SCHEME} and
* {@link ConfigConstants#DEFAULT_FILESYSTEM_SCHEME}),
* and the local filesystem is used.
* <p>
* As an example, if set to <code>hdfs://localhost:9000/</code>, then an HDFS deployment
* with the namenode being on the local node and listening to port 9000 is going to be used.
* In this case, a file path specified as <code>/user/USERNAME/in.txt</code>
* is going to be transformed into <code>hdfs://localhost:9000/user/USERNAME/in.txt</code>. By
* default this is set to <code>file:///</code> which points to the local filesystem.
* @param config the configuration from where to fetch the parameter.
*/
public static void setDefaultScheme(Configuration config) throws IOException {
synchronized (SYNCHRONIZATION_OBJECT) {
if (defaultScheme == null) {
String stringifiedUri = config.getString(ConfigConstants.FILESYSTEM_SCHEME,
ConfigConstants.DEFAULT_FILESYSTEM_SCHEME);
try {
defaultScheme = new URI(stringifiedUri);
} catch (URISyntaxException e) {
throw new IOException("The URI used to set the default filesystem " +
"scheme ('" + stringifiedUri + "') is not valid.");
}
}
}
}
/**
* Returns a reference to the {@link FileSystem} instance for accessing the
* file system identified by the given {@link URI}.
......@@ -188,22 +228,37 @@ public abstract class FileSystem {
public static FileSystem get(URI uri) throws IOException {
FileSystem fs;
URI asked = uri;
synchronized (SYNCHRONIZATION_OBJECT) {
if (uri.getScheme() == null) {
try {
uri = new URI("file", null, uri.getPath(), null);
if (defaultScheme == null) {
defaultScheme = new URI(ConfigConstants.DEFAULT_FILESYSTEM_SCHEME);
}
catch (URISyntaxException e) {
uri = new URI(defaultScheme.getScheme(), null, defaultScheme.getHost(),
defaultScheme.getPort(), uri.getPath(), null, null);
} catch (URISyntaxException e) {
try {
uri = new URI("file", null, new Path(new File(uri.getPath()).getAbsolutePath()).toUri().getPath(), null);
if (defaultScheme.getScheme().equals("file")) {
uri = new URI("file", null,
new Path(new File(uri.getPath()).getAbsolutePath()).toUri().getPath(), null);
}
} catch (URISyntaxException ex) {
// we tried to repair it, but could not. report the scheme error
throw new IOException("The file URI '" + uri.toString() + "' is not valid.");
throw new IOException("The URI '" + uri.toString() + "' is not valid.");
}
}
}
if(uri.getScheme() == null) {
throw new IOException("The URI '" + uri + "' is invalid.\n" +
"The fs.default-scheme = " + defaultScheme + ", the requested URI = " + asked +
", and the final URI = " + uri + ".");
}
if (uri.getScheme().equals("file") && uri.getAuthority() != null && !uri.getAuthority().isEmpty()) {
String supposedUri = "file:///" + uri.getAuthority() + uri.getPath();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintWriter;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class FilesystemSchemeConfigTest {
@Before
public void resetSingleton() throws SecurityException, NoSuchFieldException, IllegalArgumentException,
IllegalAccessException {
// reset GlobalConfiguration between tests
Field instance = GlobalConfiguration.class.getDeclaredField("SINGLETON");
instance.setAccessible(true);
instance.set(null, null);
}
@Test
public void testExplicitFilesystemScheme() {
testSettingFilesystemScheme(false, "fs.default-scheme: otherFS://localhost:1234/", true);
}
@Test
public void testSettingFilesystemSchemeInConfiguration() {
testSettingFilesystemScheme(false, "fs.default-scheme: file:///", false);
}
@Test
public void testUsingDefaultFilesystemScheme() {
testSettingFilesystemScheme(true, "fs.default-scheme: file:///", false);
}
private void testSettingFilesystemScheme(boolean useDefaultScheme,
String configFileScheme, boolean useExplicitScheme) {
final File tmpDir = getTmpDir();
final File confFile = createRandomFile(tmpDir, ".yaml");
final File testFile = new File(tmpDir.getAbsolutePath() + File.separator + "testing.txt");
try {
try {
final PrintWriter pw1 = new PrintWriter(confFile);
if(!useDefaultScheme) {
pw1.println(configFileScheme);
}
pw1.close();
final PrintWriter pwTest = new PrintWriter(testFile);
pwTest.close();
} catch (FileNotFoundException e) {
fail(e.getMessage());
}
GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
Configuration conf = GlobalConfiguration.getConfiguration();
try {
FileSystem.setDefaultScheme(conf);
String noSchemePath = testFile.toURI().getPath(); // remove the scheme.
URI uri = new URI(noSchemePath);
// check if the scheme == null (so that we get the configuration one.
assertTrue(uri.getScheme() == null);
// get the filesystem with the default scheme as set in the confFile1
FileSystem fs = useExplicitScheme ? FileSystem.get(testFile.toURI()) : FileSystem.get(uri);
assertTrue(fs.exists(new Path(noSchemePath)));
} catch (IOException e) {
fail(e.getMessage());
} catch (URISyntaxException e) {
e.printStackTrace();
}
} finally {
try {
// clear the default scheme set in the FileSystem class.
// we do it through reflection to avoid creating a publicly
// accessible method, which could also be wrongly used by users.
Field f = FileSystem.class.getDeclaredField("defaultScheme");
f.setAccessible(true);
f.set(null, null);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (NoSuchFieldException e) {
e.printStackTrace();
}
confFile.delete();
testFile.delete();
tmpDir.delete();
}
}
private File getTmpDir() {
File tmpDir = new File(CommonTestUtils.getTempDir() + File.separator
+ CommonTestUtils.getRandomDirectoryName() + File.separator);
tmpDir.mkdirs();
return tmpDir;
}
private File createRandomFile(File path, String suffix) {
return new File(path.getAbsolutePath() + File.separator + CommonTestUtils.getRandomDirectoryName() + suffix);
}
}
......@@ -221,5 +221,4 @@ public class GlobalConfigurationTest extends TestLogger {
private File createRandomFile(File path, String suffix) {
return new File(path.getAbsolutePath() + File.separator + CommonTestUtils.getRandomDirectoryName() + suffix);
}
}
......@@ -31,6 +31,7 @@ import grizzled.slf4j.Logger
import org.apache.flink.api.common.{ExecutionConfig, JobID}
import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.core.io.InputSplitAssigner
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
......@@ -1992,6 +1993,16 @@ object JobManager {
GlobalConfiguration.loadConfiguration(configDir)
val configuration = GlobalConfiguration.getConfiguration()
try {
FileSystem.setDefaultScheme(configuration)
}
catch {
case e: IOException => {
throw new Exception("Error while setting the default " +
"filesystem scheme from configuration.", e)
}
}
if (new File(configDir).isDirectory) {
configuration.setString(ConfigConstants.FLINK_BASE_DIR_PATH_KEY, configDir + "/..")
}
......
......@@ -35,7 +35,8 @@ import com.codahale.metrics.{Gauge, MetricFilter, MetricRegistry}
import com.fasterxml.jackson.databind.ObjectMapper
import grizzled.slf4j.Logger
import org.apache.flink.configuration._
import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, MemorySegmentFactory, MemoryType}
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.core.memory.{HybridMemorySegment, HeapMemorySegment, MemorySegmentFactory, MemoryType}
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.blob.{BlobCache, BlobService}
......@@ -1423,6 +1424,16 @@ object TaskManager {
case e: Exception => throw new Exception("Could not load configuration", e)
}
try {
FileSystem.setDefaultScheme(conf)
}
catch {
case e: IOException => {
throw new Exception("Error while setting the default " +
"filesystem scheme from configuration.", e)
}
}
conf
}
......
......@@ -21,15 +21,18 @@ package org.apache.flink.runtime.taskmanager;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;
import scala.Tuple2;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.UnknownHostException;
import java.io.PrintWriter;
import java.lang.reflect.Field;
import java.net.*;
import static org.junit.Assert.*;
......@@ -103,6 +106,38 @@ public class TaskManagerConfigurationTest {
}
}
@Test
public void testDefaultFsParameterLoading() {
final File tmpDir = getTmpDir();
final File confFile = new File(tmpDir.getAbsolutePath() + File.separator + CommonTestUtils.getRandomDirectoryName() + ".yaml");
try {
final URI defaultFS = new URI("otherFS", null, "localhost", 1234, null, null, null);
final PrintWriter pw1 = new PrintWriter(confFile);
pw1.println("fs.default-scheme: "+ defaultFS);
pw1.close();
String filepath = confFile.getAbsolutePath();
String[] args = new String[]{"--configDir:"+filepath};
TaskManager.parseArgsAndLoadConfig(args);
Field f = FileSystem.class.getDeclaredField("defaultScheme");
f.setAccessible(true);
URI scheme = (URI) f.get(null);
assertEquals("Default Filesystem Scheme not configured.", scheme, defaultFS);
} catch (FileNotFoundException e) {
fail(e.getMessage());
} catch (Exception e) {
e.printStackTrace();
} finally {
confFile.delete();
tmpDir.delete();
}
}
@Test
public void testNetworkInterfaceSelection() {
ServerSocket server;
......@@ -145,4 +180,11 @@ public class TaskManagerConfigurationTest {
}
}
private File getTmpDir() {
File tmpDir = new File(CommonTestUtils.getTempDir() + File.separator
+ CommonTestUtils.getRandomDirectoryName() + File.separator);
tmpDir.mkdirs();
return tmpDir;
}
}
......@@ -364,6 +364,13 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient {
flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
}
try {
org.apache.flink.core.fs.FileSystem.setDefaultScheme(flinkConfiguration);
} catch (IOException e) {
LOG.error("Error while setting the default " +
"filesystem scheme from configuration.", e);
return null;
}
// ------------------ Check if the specified queue exists --------------
try {
......
......@@ -18,13 +18,14 @@
package org.apache.flink.yarn
import java.io.{BufferedWriter, FileWriter, PrintWriter}
import java.io.{IOException, FileWriter, BufferedWriter, PrintWriter}
import java.net.{BindException, ServerSocket}
import java.security.PrivilegedAction
import akka.actor.{ActorRef, ActorSystem}
import org.apache.flink.client.CliFrontend
import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration}
import org.apache.flink.configuration.{GlobalConfiguration, Configuration, ConfigConstants}
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.jobmanager.{JobManager, JobManagerMode, MemoryArchivist}
import org.apache.flink.runtime.util.EnvironmentInformation
......@@ -113,6 +114,13 @@ abstract class ApplicationMasterBase {
config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 0)
}
try {
FileSystem.setDefaultScheme(config)
} catch {
case t: IOException =>
log.error("Error while setting the default filesystem scheme from configuration.", t)
}
// we try to start the JobManager actor system using the port definition
// from the config.
// first, we check if the port is available by opening a socket
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册