提交 f29f8057 编写于 作者: S Stephan Ewen 提交者: Aljoscha Krettek

[FLINK-7265] [core] Introduce FileSystemKind to differentiate between FileSystem and ObjectStore

上级 3edbb7bc
......@@ -633,6 +633,11 @@ public abstract class FileSystem {
*/
public abstract boolean isDistributedFS();
/**
* Gets a description of the characteristics of this file system.
*/
public abstract FileSystemKind getKind();
// ------------------------------------------------------------------------
// output directory initialization
// ------------------------------------------------------------------------
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.fs;
import org.apache.flink.annotation.PublicEvolving;
/**
* An enumeration defining the kind and characteristics of a {@link FileSystem}.
*/
@PublicEvolving
public enum FileSystemKind {
/**
* An actual file system, with files and directories.
*/
FILE_SYSTEM,
/**
* An Object store. Files correspond to objects.
* There are not really directories, but a directory-like structure may be mimicked
* by hierarchical naming of files.
*/
OBJECT_STORE
}
......@@ -140,6 +140,11 @@ public class SafetyNetWrapperFileSystem extends FileSystem implements WrappingPr
return unsafeFileSystem.isDistributedFS();
}
@Override
public FileSystemKind getKind() {
return unsafeFileSystem.getKind();
}
@Override
public FileSystem getWrappedDelegate() {
return unsafeFileSystem;
......
......@@ -31,6 +31,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.OperatingSystem;
......@@ -283,13 +284,18 @@ public class LocalFileSystem extends FileSystem {
return false;
}
@Override
public FileSystemKind getKind() {
return FileSystemKind.FILE_SYSTEM;
}
// ------------------------------------------------------------------------
/**
* Gets the URI that represents the local file system.
* That URI is {@code "file:/"} on Windows platforms and {@code "file:///"} on other
* UNIX family platforms.
*
*
* @return The URI that represents the local file system.
*/
public static URI getLocalFsURI() {
......@@ -298,7 +304,7 @@ public class LocalFileSystem extends FileSystem {
/**
* Gets the shared instance of this file system.
*
*
* @return The shared instance of this file system.
*/
public static LocalFileSystem getSharedInstance() {
......
......@@ -34,6 +34,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.util.FileUtils;
......@@ -312,4 +313,10 @@ public class LocalFileSystemTest {
assertTrue(fs.rename(new Path(srcFolder.toURI()), new Path(dstFolder.toURI())));
assertTrue(new File(dstFolder, srcFile.getName()).exists());
}
@Test
public void testKind() {
final FileSystem fs = FileSystem.getLocalFileSystem();
assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
}
}
......@@ -21,10 +21,12 @@ package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
import java.io.IOException;
import java.net.URI;
import java.util.Locale;
import static org.apache.flink.util.Preconditions.checkNotNull;
......@@ -36,6 +38,11 @@ public class HadoopFileSystem extends FileSystem {
/** The wrapped Hadoop File System. */
private final org.apache.hadoop.fs.FileSystem fs;
/* This field caches the file system kind. It is lazily set because the file system
* URL is lazily initialized. */
private FileSystemKind fsKind;
/**
* Wraps the given Hadoop File System object as a Flink File System object.
* The given Hadoop file system object is expected to be initialized already.
......@@ -168,4 +175,44 @@ public class HadoopFileSystem extends FileSystem {
public boolean isDistributedFS() {
return true;
}
@Override
public FileSystemKind getKind() {
if (fsKind == null) {
fsKind = getKindForScheme(this.fs.getUri().getScheme());
}
return fsKind;
}
/**
* Gets the kind of the file system from its scheme.
*
* <p>Implementation note: Initially, especially within the Flink 1.3.x line
* (in order to not break backwards compatibility), we must only label file systems
* as 'inconsistent' or as 'not proper filesystems' if we are sure about it.
* Otherwise, we cause regression for example in the performance and cleanup handling
* of checkpoints.
* For that reason, we initially mark some filesystems as 'eventually consistent' or
* as 'object stores', and leave the others as 'consistent file systems'.
*/
static FileSystemKind getKindForScheme(String scheme) {
scheme = scheme.toLowerCase(Locale.US);
if (scheme.startsWith("s3") || scheme.startsWith("emr")) {
// the Amazon S3 storage
return FileSystemKind.OBJECT_STORE;
}
else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
// file servers instead of file systems
// they might actually be consistent, but we have no hard guarantees
// currently to rely on that
return FileSystemKind.OBJECT_STORE;
}
else {
// the remainder should include hdfs, kosmos, ceph, ...
// this also includes federated HDFS (viewfs).
return FileSystemKind.FILE_SYSTEM;
}
}
}
......@@ -18,6 +18,7 @@
package org.apache.flink.runtime.fs.maprfs;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.slf4j.Logger;
......@@ -172,4 +173,9 @@ public class MapRFileSystem extends HadoopFileSystem {
throw new IOException(String.format(
"Unable to find CLDB locations for cluster %s", authority));
}
@Override
public FileSystemKind getKind() {
return FileSystemKind.FILE_SYSTEM;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.fs.hdfs;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.assertEquals;
/**
* Tests for extracting the {@link FileSystemKind} from file systems that Flink
* accesses through Hadoop's File System interface.
*
* <p>This class needs to be in this package, because it accesses package private methods
* from the HDFS file system wrapper class.
*/
public class HdfsKindTest extends TestLogger {
@Test
public void testHdfsKind() throws IOException {
final FileSystem fs = new Path("hdfs://localhost:55445/my/file").getFileSystem();
assertEquals(FileSystemKind.FILE_SYSTEM, fs.getKind());
}
@Test
public void testS3Kind() throws IOException {
try {
Class.forName("org.apache.hadoop.fs.s3.S3FileSystem");
} catch (ClassNotFoundException ignored) {
// not in the classpath, cannot run this test
log.info("Skipping test 'testS3Kind()' because the S3 file system is not in the class path");
return;
}
final FileSystem s3 = new Path("s3://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
assertEquals(FileSystemKind.OBJECT_STORE, s3.getKind());
}
@Test
public void testS3nKind() throws IOException {
try {
Class.forName("org.apache.hadoop.fs.s3native.NativeS3FileSystem");
} catch (ClassNotFoundException ignored) {
// not in the classpath, cannot run this test
log.info("Skipping test 'testS3nKind()' because the Native S3 file system is not in the class path");
return;
}
final FileSystem s3n = new Path("s3n://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
assertEquals(FileSystemKind.OBJECT_STORE, s3n.getKind());
}
@Test
public void testS3aKind() throws IOException {
try {
Class.forName("org.apache.hadoop.fs.s3a.S3AFileSystem");
} catch (ClassNotFoundException ignored) {
// not in the classpath, cannot run this test
log.info("Skipping test 'testS3aKind()' because the S3AFileSystem is not in the class path");
return;
}
final FileSystem s3a = new Path("s3a://myId:mySecret@bucket/some/bucket/some/object").getFileSystem();
assertEquals(FileSystemKind.OBJECT_STORE, s3a.getKind());
}
@Test
public void testS3fileSystemSchemes() {
assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3"));
assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3n"));
assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("s3a"));
assertEquals(FileSystemKind.OBJECT_STORE, HadoopFileSystem.getKindForScheme("EMRFS"));
}
@Test
public void testViewFs() {
assertEquals(FileSystemKind.FILE_SYSTEM, HadoopFileSystem.getKindForScheme("viewfs"));
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册