提交 f2909293 编写于 作者: R Robert Metzger

[FLINK-1266] Generalize DistributedFileSystem implementation

to HadoopFileSystem wrapper, which supports all subclasses of org.apache.hadoop.fs.FileSystem.
This allows us to let users use all file systems with support for HDFS.
The change has been tested with Tachyon, Google Cloud Storage Hadoop Adapter and HDFS.

The change also cleans up the Hadoop dependency exclusions.
上级 d8dbaeeb
---
title: "Example: Connectors"
title: "Connecting to other systems"
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
......@@ -20,14 +20,56 @@ specific language governing permissions and limitations
under the License.
-->
Apache Flink allows users to access many different systems as data sources or sinks. The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept of so called `InputFormat`s and `OutputFormat`s.
## Reading from filesystems.
One implementation of these `InputFormat`s is the `HadoopInputFormat`. This is a wrapper that allows users to use all existing Hadoop input formats with Flink.
Flink has build-in support for the following file systems:
This page shows some examples for connecting Flink to other systems.
| Filesystem | Since | Scheme | Notes |
| ------------- |-------------| -----| ------ |
| Hadoop Distributed File System (HDFS) | 0.2 | `hdfs://`| All HDFS versions are supported |
| Amazon S3 | 0.2 | `s3://` | |
| MapR file system | 0.7-incubating | `maprfs://` | The user has to manually place the required jar files in the `lib/` dir |
| Tachyon | 0.9 | `tachyon://` | Support through Hadoop file system implementation (see below) |
## Access Microsoft Azure Table Storage
### Using Hadoop file systems with Apache Flink
Apache Flink allows users to use any file system implementing the `org.apache.hadoop.fs.FileSystem`
interface. Hadoop ships adapters for FTP, [Hftp](http://hadoop.apache.org/docs/r1.2.1/hftp.html), and others.
Flink has integrated testcases to validate the integration with [Tachyon](http://tachyon-project.org/).
Other file systems we tested the integration is the
[Google Cloud Storage Connector for Hadoop](https://cloud.google.com/hadoop/google-cloud-storage-connector).
In order to use a Hadoop file system with Flink, make sure that the `flink-conf.yaml` has set the
`fs.hdfs.hadoopconf` property set to the Hadoop configuration directory.
In addition to that, the Hadoop configuration (in that directory) needs to have an entry for each supported file system.
For example for tachyon support, there must be the following entry in the `core-site.xml` file:
~~~xml
<property>
<name>fs.tachyon.impl</name>
<value>tachyon.hadoop.TFS</value>
</property>
~~~
## Connecting to other systems using Input / Output Format wrappers for Hadoop
Apache Flink allows users to access many different systems as data sources or sinks.
The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept
of so called `InputFormat`s and `OutputFormat`s.
One implementation of these `InputFormat`s is the `HadoopInputFormat`. This is a wrapper that allows
users to use all existing Hadoop input formats with Flink.
This section shows some examples for connecting Flink to other systems.
[Read more about Hadoop compatibility in Flink](hadoop_compatibility.html).
### Access Microsoft Azure Table Storage
_Note: This example works starting from Flink 0.6-incubating_
......
......@@ -33,7 +33,8 @@ You can:
- use a Hadoop `Mapper` as [FlatMapFunction](dataset_transformations.html#flatmap).
- use a Hadoop `Reducer` as [GroupReduceFunction](dataset_transformations.html#groupreduce-on-grouped-dataset).
This document shows how to use existing Hadoop MapReduce code with Flink.
This document shows how to use existing Hadoop MapReduce code with Flink. Please refer to the
[Connecting to other systems](example_connectors.html) guide for reading from Hadoop supported file systems.
### Project Configuration
......
......@@ -18,7 +18,6 @@ specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......@@ -47,6 +46,12 @@ under the License.
<artifactId>flink-clients</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
......@@ -68,40 +73,6 @@ under the License.
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
......
......@@ -22,7 +22,7 @@ package org.apache.flink.hadoopcompatibility.mapred.utils;
import java.lang.reflect.Constructor;
import java.util.Map;
import org.apache.flink.runtime.fs.hdfs.DistributedFileSystem;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.JobID;
......@@ -36,7 +36,7 @@ public class HadoopUtils {
* Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration.
*/
public static void mergeHadoopConf(JobConf jobConf) {
org.apache.hadoop.conf.Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration();
org.apache.hadoop.conf.Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
for (Map.Entry<String, String> e : hadoopConf) {
jobConf.set(e.getKey(), e.getValue());
}
......
......@@ -21,7 +21,7 @@ package org.apache.flink.hadoopcompatibility.mapreduce.utils;
import java.lang.reflect.Constructor;
import java.util.Map;
import org.apache.flink.runtime.fs.hdfs.DistributedFileSystem;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
......@@ -34,7 +34,7 @@ public class HadoopUtils {
* Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration.
*/
public static void mergeHadoopConf(Configuration configuration) {
Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration();
Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
for (Map.Entry<String, String> e : hadoopConf) {
configuration.set(e.getKey(), e.getValue());
......
......@@ -17,8 +17,9 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
......@@ -41,7 +42,7 @@ under the License.
<artifactId>flink-shaded</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
......@@ -224,14 +225,13 @@ under the License.
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
</plugins>
</build>
</project>
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>flink-addons</artifactId>
<groupId>org.apache.flink</groupId>
<version>0.8-incubating-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>flink-tachyon</artifactId>
<name>flink-tachyon</name>
<packaging>jar</packaging>
<!--
This is a Hadoop2 only flink module.
-->
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java-examples</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon</artifactId>
<version>0.5.0</version>
</dependency>
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon</artifactId>
<version>0.5.0</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-util</artifactId>
<version>7.6.8.v20121106</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<type>test-jar</type>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>7.6.8.v20121106</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>7.6.8.v20121106</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.tachyon;
import org.apache.commons.io.IOUtils;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.examples.java.wordcount.WordCount;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
/**
* This test should logically be located in the 'flink-runtime' tests. However, this project
* has already all dependencies required (flink-java-examples). Also, the DOPOneExecEnv is here.
*/
public class HDFSTest {
private String hdfsURI;
private MiniDFSCluster hdfsCluster;
private org.apache.hadoop.fs.Path hdPath;
private org.apache.hadoop.fs.FileSystem hdfs;
@Before
public void createHDFS() {
try {
Configuration hdConf = new Configuration();
File baseDir = new File("./target/hdfs/hdfsTest").getAbsoluteFile();
FileUtil.fullyDelete(baseDir);
hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
hdfsCluster = builder.build();
hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort() +"/";
hdPath = new org.apache.hadoop.fs.Path("/test");
hdfs = hdPath.getFileSystem(hdConf);
FSDataOutputStream stream = hdfs.create(hdPath);
for(int i = 0; i < 10; i++) {
stream.write("Hello HDFS\n".getBytes());
}
stream.close();
} catch(Throwable e) {
e.printStackTrace();
Assert.fail("Test failed " + e.getMessage());
}
}
@After
public void destroyHDFS() {
try {
hdfs.delete(hdPath, false);
hdfsCluster.shutdown();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Test
public void testHDFS() {
Path file = new Path(hdfsURI + hdPath);
org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/result");
try {
FileSystem fs = file.getFileSystem();
Assert.assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem);
new TachyonFileSystemWrapperTest.DopOneTestEnvironment();
try {
WordCount.main(new String[]{file.toString(), result.toString()});
} catch(Throwable t) {
t.printStackTrace();
Assert.fail("Test failed with " + t.getMessage());
}
Assert.assertTrue("No result file present", hdfs.exists(result));
// validate output:
org.apache.hadoop.fs.FSDataInputStream inStream = hdfs.open(result);
StringWriter writer = new StringWriter();
IOUtils.copy(inStream, writer);
String resultString = writer.toString();
Assert.assertEquals("hdfs 10\n" +
"hello 10\n", resultString);
inStream.close();
} catch (IOException e) {
e.printStackTrace();
Assert.fail("Error in test: " + e.getMessage() );
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.tachyon;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.api.java.LocalEnvironment;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.examples.java.wordcount.WordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import tachyon.client.InStream;
import tachyon.client.OutStream;
import tachyon.client.ReadType;
import tachyon.client.TachyonFS;
import tachyon.client.TachyonFile;
import tachyon.client.WriteType;
import tachyon.master.LocalTachyonCluster;
import java.io.File;
import java.io.StringWriter;
import java.net.URISyntaxException;
import java.net.URL;
public class TachyonFileSystemWrapperTest {
private static final long TACHYON_WORKER_CAPACITY = 1024 * 1024 * 32;
private static final String TACHYON_TEST_IN_FILE_NAME = "tachyontest";
private static final String TACHYON_TEST_OUT_FILE_NAME = "result";
private static final Path HADOOP_CONFIG_PATH;
static {
URL resource = TachyonFileSystemWrapperTest.class.getResource("/tachyonHadoopConf.xml");
File file = null;
try {
file = new File(resource.toURI());
} catch (URISyntaxException e) {
throw new RuntimeException("Unable to load req. res", e);
}
if(!file.exists()) {
throw new RuntimeException("Unable to load required resource");
}
HADOOP_CONFIG_PATH = new Path(file.getAbsolutePath());
}
private LocalTachyonCluster cluster;
private TachyonFS client;
private String input;
private String output;
@Before
public void startTachyon() {
try {
cluster = new LocalTachyonCluster(TACHYON_WORKER_CAPACITY);
cluster.start();
client = cluster.getClient();
int id = client.createFile("/" + TACHYON_TEST_IN_FILE_NAME, 1024 * 32);
Assert.assertNotEquals("Unable to create file", -1, id);
TachyonFile testFile = client.getFile(id);
Assert.assertNotNull(testFile);
OutStream outStream = testFile.getOutStream(WriteType.MUST_CACHE);
for(int i = 0; i < 10; i++) {
outStream.write("Hello Tachyon\n".getBytes());
}
outStream.close();
final String tachyonBase = "tachyon://" + cluster.getMasterHostname() + ":" + cluster.getMasterPort();
input = tachyonBase + "/" + TACHYON_TEST_IN_FILE_NAME;
output = tachyonBase + "/" + TACHYON_TEST_OUT_FILE_NAME;
} catch(Exception e) {
e.printStackTrace();
Assert.fail("Test preparation failed with exception: "+e.getMessage());
}
}
@After
public void stopTachyon() {
try {
cluster.stop();
} catch(Exception e) {
e.printStackTrace();
Assert.fail("Test teardown failed with exception: "+e.getMessage());
}
}
// Verify that Hadoop's FileSystem can load the TFS (Tachyon File System)
@Test
public void testHadoopLoadability() {
try {
Path tPath = new Path(input);
Configuration conf = new Configuration();
conf.addResource(HADOOP_CONFIG_PATH);
Assert.assertEquals("tachyon.hadoop.TFS", conf.get("fs.tachyon.impl", null));
FileSystem hfs = tPath.getFileSystem(conf);
} catch(Exception e) {
e.printStackTrace();
Assert.fail("Test failed with exception: "+e.getMessage());
}
}
@Test
public void testTachyon() {
try {
org.apache.flink.configuration.Configuration addHDConfToFlinkConf = new org.apache.flink.configuration.Configuration();
addHDConfToFlinkConf.setString(ConfigConstants.HDFS_DEFAULT_CONFIG, HADOOP_CONFIG_PATH.toString());
GlobalConfiguration.includeConfiguration(addHDConfToFlinkConf);
new DopOneTestEnvironment(); // initialize DOP one
WordCount.main(new String[]{input, output});
// List<Integer> files = client.listFiles("/", true);
// for(Integer file : files) {
// TachyonFile f = client.getFile(file);
// System.out.println("file = "+file+" f = "+f.getPath());
// }
// verify result
TachyonFile resultFile = client.getFile("/" + TACHYON_TEST_OUT_FILE_NAME);
Assert.assertNotNull("Result file has not been created", resultFile);
InStream inStream = resultFile.getInStream(ReadType.CACHE);
Assert.assertNotNull("Result file has not been created", inStream);
StringWriter writer = new StringWriter();
IOUtils.copy(inStream, writer);
String resultString = writer.toString();
Assert.assertEquals("hello 10\n" +
"tachyon 10\n", resultString);
} catch(Exception e) {
e.printStackTrace();
Assert.fail("Test failed with exception: "+e.getMessage());
}
}
// package visible
static final class DopOneTestEnvironment extends LocalEnvironment {
static {
initializeContextEnvironment(new ExecutionEnvironmentFactory() {
@Override
public ExecutionEnvironment createExecutionEnvironment() {
LocalEnvironment le = new LocalEnvironment();
le.setDegreeOfParallelism(1);
return le;
}
});
}
}
}
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Tachyon's test-jar dependency adds a log4j.properties file to classpath.
# Until the issue is resolved (see https://github.com/amplab/tachyon/pull/571)
# we provide a log4j.properties file ourselves.
log4j.rootLogger=OFF
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>fs.tachyon.impl</name>
<value>tachyon.hadoop.TFS</value>
</property>
</configuration>
\ No newline at end of file
......@@ -82,158 +82,22 @@ under the License.
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependency>
</dependencies>
<build>
......
......@@ -44,6 +44,22 @@ under the License.
<!-- See main pom.xml for explanation of profiles -->
<profiles>
<profile>
<id>hadoop-2</id>
<activation>
<property>
<!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
<!--hadoop2--><name>!hadoop.profile</name>
</property>
</activation>
<modules>
<!-- Include the Flink-tachyon project only for HD2.
The HDFS minicluster interfaces changed between the two versions.
-->
<module>flink-tachyon</module>
</modules>
</profile>
<profile>
<id>include-yarn</id>
<activation>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.core.fs;
public interface AbstractHadoopWrapper {
/**
* Test whether the HadoopWrapper can wrap the given file system scheme.
* @param scheme
* @return
*/
public Class<?> getHadoopWrapperClassNameForFileSystem(String scheme);
}
......@@ -28,6 +28,7 @@ package org.apache.flink.core.fs;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
......@@ -36,6 +37,8 @@ import java.util.Map;
import org.apache.flink.util.ClassUtils;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An abstract base class for a fairly generic file system. It
......@@ -43,15 +46,18 @@ import org.apache.flink.util.StringUtils;
* one that reflects the locally-connected disk.
*/
public abstract class FileSystem {
private static final Logger LOG = LoggerFactory.getLogger(FileSystem.class);
private static final String LOCAL_FILESYSTEM_CLASS = "org.apache.flink.core.fs.local.LocalFileSystem";
private static final String HADOOP_DISTRIBUTED_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.DistributedFileSystem";
private static final String HADOOP_WRAPPER_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.HadoopFileSystem";
private static final String MAPR_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.maprfs.MapRFileSystem";
private static final String S3_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.s3.S3FileSystem";
private static final String HADOOP_WRAPPER_SCHEME = "hdwrapper";
/** Object used to protect calls to specific methods.*/
private static final Object SYNCHRONIZATION_OBJECT = new Object();
......@@ -148,7 +154,7 @@ public abstract class FileSystem {
private static final Map<String, String> FSDIRECTORY = new HashMap<String, String>();
static {
FSDIRECTORY.put("hdfs", HADOOP_DISTRIBUTED_FILESYSTEM_CLASS);
FSDIRECTORY.put("hdfs", HADOOP_WRAPPER_FILESYSTEM_CLASS);
FSDIRECTORY.put("maprfs", MAPR_FILESYSTEM_CLASS);
FSDIRECTORY.put("file", LOCAL_FILESYSTEM_CLASS);
FSDIRECTORY.put("s3", S3_FILESYSTEM_CLASS);
......@@ -188,7 +194,6 @@ public abstract class FileSystem {
* thrown if a reference to the file system instance could not be obtained
*/
public static FileSystem get(URI uri) throws IOException {
FileSystem fs = null;
synchronized (SYNCHRONIZATION_OBJECT) {
......@@ -222,38 +227,97 @@ public abstract class FileSystem {
}
// Try to create a new file system
if (!FSDIRECTORY.containsKey(uri.getScheme())) {
throw new IOException("No file system found with scheme " + uri.getScheme()
+ ", referenced in file URI '" + uri.toString() + "'.");
}
// no build in support for this file system. Falling back to Hadoop's FileSystem impl.
Class<?> wrapperClass = getHadoopWrapperClassNameForFileSystem(uri.getScheme());
if(wrapperClass != null) {
// hadoop has support for the FileSystem
FSKey wrappedKey = new FSKey(HADOOP_WRAPPER_SCHEME + "+" + uri.getScheme(), uri.getAuthority());
if (CACHE.containsKey(wrappedKey)) {
return CACHE.get(wrappedKey);
}
// cache didn't contain the file system. instantiate it:
// by now we know that the HadoopFileSystem wrapper can wrap the file system.
fs = instantiateHadoopFileSystemWrapper(wrapperClass);
fs.initialize(uri);
System.out.println("Initializing new instance of wrapper for "+wrapperClass);
CACHE.put(wrappedKey, fs);
} else {
// we can not read from this file system.
throw new IOException("No file system found with scheme " + uri.getScheme()
+ ", referenced in file URI '" + uri.toString() + "'.");
}
} else {
// we end up here if we have a file system with build-in flink support.
String fsClass = FSDIRECTORY.get(uri.getScheme());
if(fsClass.equals(HADOOP_WRAPPER_FILESYSTEM_CLASS)) {
fs = instantiateHadoopFileSystemWrapper(null);
} else {
fs = instantiateFileSystem(fsClass);
}
System.out.println("Initializing new instance of native class for "+fsClass);
// Initialize new file system object
fs.initialize(uri);
Class<? extends FileSystem> fsClass;
try {
fsClass = ClassUtils.getFileSystemByName(FSDIRECTORY.get(uri.getScheme()));
} catch (ClassNotFoundException e1) {
throw new IOException(StringUtils.stringifyException(e1));
// Add new file system object to cache
CACHE.put(key, fs);
}
}
try {
fs = fsClass.newInstance();
}
catch (InstantiationException e) {
throw new IOException("Could not instantiate file system class: " + e.getMessage(), e);
}
catch (IllegalAccessException e) {
throw new IOException("Could not instantiate file system class: " + e.getMessage(), e);
}
return fs;
}
// Initialize new file system object
fs.initialize(uri);
//Class must implement Hadoop FileSystem interface. The class is not avaiable in 'flink-core'.
private static FileSystem instantiateHadoopFileSystemWrapper(Class<?> wrappedFileSystem) throws IOException {
FileSystem fs = null;
Class<? extends FileSystem> fsClass;
try {
fsClass = ClassUtils.getFileSystemByName(HADOOP_WRAPPER_FILESYSTEM_CLASS);
Constructor<? extends FileSystem> fsClassCtor = fsClass.getConstructor(Class.class);
fs = fsClassCtor.newInstance(wrappedFileSystem);
} catch (Throwable e) {
throw new IOException("Error loading Hadoop FS wrapper", e);
}
return fs;
}
// Add new file system object to cache
CACHE.put(key, fs);
private static FileSystem instantiateFileSystem(String className) throws IOException {
FileSystem fs = null;
Class<? extends FileSystem> fsClass;
try {
fsClass = ClassUtils.getFileSystemByName(className);
} catch (ClassNotFoundException e1) {
throw new IOException(StringUtils.stringifyException(e1));
}
try {
fs = fsClass.newInstance();
}
catch (InstantiationException e) {
throw new IOException("Could not instantiate file system class: " + e.getMessage(), e);
}
catch (IllegalAccessException e) {
throw new IOException("Could not instantiate file system class: " + e.getMessage(), e);
}
return fs;
}
private static AbstractHadoopWrapper hadoopWrapper;
private static Class<?> getHadoopWrapperClassNameForFileSystem(String scheme) {
if(hadoopWrapper == null) {
try {
hadoopWrapper = (AbstractHadoopWrapper) instantiateHadoopFileSystemWrapper(null);
} catch (IOException e) {
throw new RuntimeException("Error creating new Hadoop wrapper", e);
}
}
return hadoopWrapper.getHadoopWrapperClassNameForFileSystem(scheme);
}
/**
* Returns the path of the file system's current working directory.
*
......
......@@ -70,6 +70,7 @@ under the License.
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
......@@ -355,40 +356,6 @@ under the License.
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
......@@ -404,86 +371,10 @@ under the License.
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
......
......@@ -30,7 +30,7 @@ import org.apache.flink.core.fs.BlockLocation;
* Hadoop Distributed File System.
*
*/
public final class DistributedBlockLocation implements BlockLocation {
public final class HadoopBlockLocation implements BlockLocation {
/**
* Specifies the character separating the hostname from the domain name.
......@@ -58,7 +58,7 @@ public final class DistributedBlockLocation implements BlockLocation {
* @param blockLocation
* the original HDFS block location
*/
public DistributedBlockLocation(final org.apache.hadoop.fs.BlockLocation blockLocation) {
public HadoopBlockLocation(final org.apache.hadoop.fs.BlockLocation blockLocation) {
this.blockLocation = blockLocation;
}
......
......@@ -28,7 +28,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
* Hadoop Distributed File System.
*
*/
public final class DistributedDataInputStream extends FSDataInputStream {
public final class HadoopDataInputStream extends FSDataInputStream {
private org.apache.hadoop.fs.FSDataInputStream fsDataInputStream = null;
......@@ -38,7 +38,7 @@ public final class DistributedDataInputStream extends FSDataInputStream {
* @param fsDataInputStream
* the HDFS input stream
*/
public DistributedDataInputStream(org.apache.hadoop.fs.FSDataInputStream fsDataInputStream) {
public HadoopDataInputStream(org.apache.hadoop.fs.FSDataInputStream fsDataInputStream) {
this.fsDataInputStream = fsDataInputStream;
}
......
......@@ -23,11 +23,11 @@ import java.io.IOException;
import org.apache.flink.core.fs.FSDataOutputStream;
public final class DistributedDataOutputStream extends FSDataOutputStream {
public final class HadoopDataOutputStream extends FSDataOutputStream {
private org.apache.hadoop.fs.FSDataOutputStream fdos;
public DistributedDataOutputStream(org.apache.hadoop.fs.FSDataOutputStream fdos) {
public HadoopDataOutputStream(org.apache.hadoop.fs.FSDataOutputStream fdos) {
this.fdos = fdos;
}
......
......@@ -25,7 +25,7 @@ import org.apache.flink.core.fs.Path;
* Concrete implementation of the {@link FileStatus} interface for the
* Hadoop Distribution File System.
*/
public final class DistributedFileStatus implements FileStatus {
public final class HadoopFileStatus implements FileStatus {
private org.apache.hadoop.fs.FileStatus fileStatus;
......@@ -35,7 +35,7 @@ public final class DistributedFileStatus implements FileStatus {
* @param fileStatus
* the HDFS file status
*/
public DistributedFileStatus(org.apache.hadoop.fs.FileStatus fileStatus) {
public HadoopFileStatus(org.apache.hadoop.fs.FileStatus fileStatus) {
this.fileStatus = fileStatus;
}
......
......@@ -15,8 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.runtime.fs.hdfs;
import java.io.File;
......@@ -25,6 +23,7 @@ import java.lang.reflect.Method;
import java.net.URI;
import java.net.UnknownHostException;
import org.apache.flink.core.fs.AbstractHadoopWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.configuration.ConfigConstants;
......@@ -39,12 +38,16 @@ import org.apache.flink.util.InstantiationUtil;
import org.apache.hadoop.conf.Configuration;
/**
* Concrete implementation of the {@link FileSystem} base class for the Hadoop Distribution File System. The
* class is essentially a wrapper class which encapsulated the original Hadoop HDFS API.
* Concrete implementation of the {@link FileSystem} base class for the Hadoop File System. The
* class is a wrapper class which encapsulated the original Hadoop HDFS API.
*
* If no file system class is specified, the wrapper will automatically load the Hadoop
* distributed file system (HDFS).
*
*/
public final class DistributedFileSystem extends FileSystem {
public final class HadoopFileSystem extends FileSystem implements AbstractHadoopWrapper {
private static final Logger LOG = LoggerFactory.getLogger(DistributedFileSystem.class);
private static final Logger LOG = LoggerFactory.getLogger(HadoopFileSystem.class);
private static final String DEFAULT_HDFS_CLASS = "org.apache.hadoop.hdfs.DistributedFileSystem";
......@@ -52,7 +55,6 @@ public final class DistributedFileSystem extends FileSystem {
* Configuration value name for the DFS implementation name. Usually not specified in hadoop configurations.
*/
private static final String HDFS_IMPLEMENTATION_KEY = "fs.hdfs.impl";
private final org.apache.hadoop.conf.Configuration conf;
......@@ -65,30 +67,37 @@ public final class DistributedFileSystem extends FileSystem {
* @throws IOException
* throw if the required HDFS classes cannot be instantiated
*/
public DistributedFileSystem() throws IOException {
public HadoopFileSystem(Class<? extends org.apache.hadoop.fs.FileSystem> fsClass) throws IOException {
// Create new Hadoop configuration object
this.conf = getHadoopConfiguration();
if(fsClass == null) {
fsClass = getDefaultHDFSClass();
}
this.fs = instantiateFileSystem(fsClass);
}
private Class<? extends org.apache.hadoop.fs.FileSystem> getDefaultHDFSClass() throws IOException {
Class<? extends org.apache.hadoop.fs.FileSystem> fsClass = null;
// try to get the FileSystem implementation class Hadoop 2.0.0 style
{
LOG.debug("Trying to load HDFS class Hadoop 2.x style.");
Object fsHandle = null;
try {
Method newApi = org.apache.hadoop.fs.FileSystem.class.getMethod("getFileSystemClass", String.class, org.apache.hadoop.conf.Configuration.class);
fsHandle = newApi.invoke(null, "hdfs", conf);
fsHandle = newApi.invoke(null, "hdfs", conf);
} catch (Exception e) {
// if we can't find the FileSystem class using the new API,
// clazz will still be null, we assume we're running on an older Hadoop version
}
if (fsHandle != null) {
if (fsHandle instanceof Class && org.apache.hadoop.fs.FileSystem.class.isAssignableFrom((Class<?>) fsHandle)) {
fsClass = ((Class<?>) fsHandle).asSubclass(org.apache.hadoop.fs.FileSystem.class);
if (LOG.isDebugEnabled()) {
LOG.debug("Loaded '" + fsClass.getName() + "' as HDFS class.");
}
......@@ -99,7 +108,7 @@ public final class DistributedFileSystem extends FileSystem {
}
}
}
// fall back to an older Hadoop version
if (fsClass == null)
{
......@@ -110,12 +119,12 @@ public final class DistributedFileSystem extends FileSystem {
}
Class<?> classFromConfig = conf.getClass(HDFS_IMPLEMENTATION_KEY, null);
if (classFromConfig != null)
{
if (org.apache.hadoop.fs.FileSystem.class.isAssignableFrom(classFromConfig)) {
fsClass = classFromConfig.asSubclass(org.apache.hadoop.fs.FileSystem.class);
if (LOG.isDebugEnabled()) {
LOG.debug("Loaded HDFS class '" + fsClass.getName() + "' as specified in configuration.");
}
......@@ -124,9 +133,9 @@ public final class DistributedFileSystem extends FileSystem {
if (LOG.isDebugEnabled()) {
LOG.debug("HDFS class specified by " + HDFS_IMPLEMENTATION_KEY + " is of wrong type.");
}
throw new IOException("HDFS class specified by " + HDFS_IMPLEMENTATION_KEY +
" cannot be cast to a FileSystem type.");
" cannot be cast to a FileSystem type.");
}
}
else {
......@@ -134,7 +143,7 @@ public final class DistributedFileSystem extends FileSystem {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to load default HDFS implementation " + DEFAULT_HDFS_CLASS);
}
try {
Class <?> reflectedClass = Class.forName(DEFAULT_HDFS_CLASS);
if (org.apache.hadoop.fs.FileSystem.class.isAssignableFrom(reflectedClass)) {
......@@ -143,25 +152,24 @@ public final class DistributedFileSystem extends FileSystem {
if (LOG.isDebugEnabled()) {
LOG.debug("Default HDFS class is of wrong type.");
}
throw new IOException("The default HDFS class '" + DEFAULT_HDFS_CLASS +
"' cannot be cast to a FileSystem type.");
throw new IOException("The default HDFS class '" + DEFAULT_HDFS_CLASS +
"' cannot be cast to a FileSystem type.");
}
}
catch (ClassNotFoundException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Default HDFS class cannot be loaded.");
}
throw new IOException("No HDFS class has been configured and the default class '" +
DEFAULT_HDFS_CLASS + "' cannot be loaded.");
}
}
}
this.fs = instantiateFileSystem(fsClass);
return fsClass;
}
/**
* Returns a new Hadoop Configuration object using the path to the hadoop conf configured
* in the main configuration (flink-conf.yaml).
......@@ -256,12 +264,12 @@ public final class DistributedFileSystem extends FileSystem {
public void initialize(URI path) throws IOException {
// For HDFS we have to have an authority
if (path.getAuthority() == null) {
if (path.getAuthority() == null && path.getScheme().equals("hdfs")) {
String configEntry = this.conf.get("fs.default.name", null);
String configEntry = this.conf.get("fs.defaultFS", null);
if (configEntry == null) {
// fs.default.name deprecated as of hadoop 2.2.0 http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/DeprecatedProperties.html
configEntry = this.conf.get("fs.defaultFS", null);
configEntry = this.conf.get("fs.default.name", null);
}
if (LOG.isDebugEnabled()) {
......@@ -298,12 +306,12 @@ public final class DistributedFileSystem extends FileSystem {
}
}
else {
// Initialize HDFS
// Initialize file system
try {
this.fs.initialize(path, this.conf);
}
catch (UnknownHostException e) {
String message = "The HDFS NameNode host at '" + path.getAuthority()
String message = "The (HDFS NameNode) host at '" + path.getAuthority()
+ "', specified by file path '" + path.toString() + "', cannot be resolved"
+ (e.getMessage() != null ? ": " + e.getMessage() : ".");
......@@ -331,26 +339,26 @@ public final class DistributedFileSystem extends FileSystem {
@Override
public FileStatus getFileStatus(final Path f) throws IOException {
org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(new org.apache.hadoop.fs.Path(f.toString()));
return new DistributedFileStatus(status);
return new HadoopFileStatus(status);
}
@Override
public BlockLocation[] getFileBlockLocations(final FileStatus file, final long start, final long len)
throws IOException
{
if (!(file instanceof DistributedFileStatus)) {
if (!(file instanceof HadoopFileStatus)) {
throw new IOException("file is not an instance of DistributedFileStatus");
}
final DistributedFileStatus f = (DistributedFileStatus) file;
final HadoopFileStatus f = (HadoopFileStatus) file;
final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
start, len);
// Wrap up HDFS specific block location objects
final DistributedBlockLocation[] distBlkLocations = new DistributedBlockLocation[blkLocations.length];
final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
for (int i = 0; i < distBlkLocations.length; i++) {
distBlkLocations[i] = new DistributedBlockLocation(blkLocations[i]);
distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
}
return distBlkLocations;
......@@ -362,13 +370,13 @@ public final class DistributedFileSystem extends FileSystem {
final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(new org.apache.hadoop.fs.Path(f.toString()),
bufferSize);
return new DistributedDataInputStream(fdis);
return new HadoopDataInputStream(fdis);
}
@Override
public FSDataInputStream open(final Path f) throws IOException {
final org.apache.hadoop.fs.FSDataInputStream fdis = fs.open(new org.apache.hadoop.fs.Path(f.toString()));
return new DistributedDataInputStream(fdis);
return new HadoopDataInputStream(fdis);
}
@Override
......@@ -378,7 +386,7 @@ public final class DistributedFileSystem extends FileSystem {
{
final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
new org.apache.hadoop.fs.Path(f.toString()), overwrite, bufferSize, replication, blockSize);
return new DistributedDataOutputStream(fdos);
return new HadoopDataOutputStream(fdos);
}
......@@ -386,7 +394,7 @@ public final class DistributedFileSystem extends FileSystem {
public FSDataOutputStream create(final Path f, final boolean overwrite) throws IOException {
final org.apache.hadoop.fs.FSDataOutputStream fsDataOutputStream = this.fs
.create(new org.apache.hadoop.fs.Path(f.toString()), overwrite);
return new DistributedDataOutputStream(fsDataOutputStream);
return new HadoopDataOutputStream(fsDataOutputStream);
}
@Override
......@@ -401,7 +409,7 @@ public final class DistributedFileSystem extends FileSystem {
// Convert types
for (int i = 0; i < files.length; i++) {
files[i] = new DistributedFileStatus(hadoopFiles[i]);
files[i] = new HadoopFileStatus(hadoopFiles[i]);
}
return files;
......@@ -428,4 +436,23 @@ public final class DistributedFileSystem extends FileSystem {
public boolean isDistributedFS() {
return true;
}
@Override
public Class<?> getHadoopWrapperClassNameForFileSystem(String scheme) {
Configuration hadoopConf = getHadoopConfiguration();
Class<? extends org.apache.hadoop.fs.FileSystem> clazz = null;
// We can activate this block once we drop Hadoop1 support (only hd2 has the getFileSystemClass-method)
// try {
// clazz = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConf);
// } catch (IOException e) {
// LOG.info("Flink could not load the Hadoop File system implementation for scheme "+scheme);
// return null;
// }
clazz = hadoopConf.getClass("fs." + scheme + ".impl", null, org.apache.hadoop.fs.FileSystem.class);
if(clazz != null && LOG.isDebugEnabled()) {
LOG.debug("Flink supports "+scheme+" with the Hadoop file system wrapper, impl "+clazz);
}
return clazz;
}
}
......@@ -35,10 +35,10 @@ import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.fs.hdfs.DistributedBlockLocation;
import org.apache.flink.runtime.fs.hdfs.DistributedDataInputStream;
import org.apache.flink.runtime.fs.hdfs.DistributedDataOutputStream;
import org.apache.flink.runtime.fs.hdfs.DistributedFileStatus;
import org.apache.flink.runtime.fs.hdfs.HadoopBlockLocation;
import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
import org.apache.flink.runtime.fs.hdfs.HadoopFileStatus;
/**
* Concrete implementation of the {@link FileSystem} base class for the MapR
......@@ -268,27 +268,27 @@ public final class MapRFileSystem extends FileSystem {
final org.apache.hadoop.fs.FileStatus status = this.fs
.getFileStatus(new org.apache.hadoop.fs.Path(f.toString()));
return new DistributedFileStatus(status);
return new HadoopFileStatus(status);
}
@Override
public BlockLocation[] getFileBlockLocations(final FileStatus file,
final long start, final long len) throws IOException {
if (!(file instanceof DistributedFileStatus)) {
if (!(file instanceof HadoopFileStatus)) {
throw new IOException(
"file is not an instance of DistributedFileStatus");
}
final DistributedFileStatus f = (DistributedFileStatus) file;
final HadoopFileStatus f = (HadoopFileStatus) file;
final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs
.getFileBlockLocations(f.getInternalFileStatus(), start, len);
// Wrap up HDFS specific block location objects
final DistributedBlockLocation[] distBlkLocations = new DistributedBlockLocation[blkLocations.length];
final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
for (int i = 0; i < distBlkLocations.length; i++) {
distBlkLocations[i] = new DistributedBlockLocation(blkLocations[i]);
distBlkLocations[i] = new HadoopBlockLocation(blkLocations[i]);
}
return distBlkLocations;
......@@ -301,7 +301,7 @@ public final class MapRFileSystem extends FileSystem {
final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open(
new org.apache.hadoop.fs.Path(f.toString()), bufferSize);
return new DistributedDataInputStream(fdis);
return new HadoopDataInputStream(fdis);
}
@Override
......@@ -310,7 +310,7 @@ public final class MapRFileSystem extends FileSystem {
final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs
.open(new org.apache.hadoop.fs.Path(f.toString()));
return new DistributedDataInputStream(fdis);
return new HadoopDataInputStream(fdis);
}
@Override
......@@ -322,7 +322,7 @@ public final class MapRFileSystem extends FileSystem {
new org.apache.hadoop.fs.Path(f.toString()), overwrite,
bufferSize, replication, blockSize);
return new DistributedDataOutputStream(fdos);
return new HadoopDataOutputStream(fdos);
}
@Override
......@@ -332,7 +332,7 @@ public final class MapRFileSystem extends FileSystem {
final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create(
new org.apache.hadoop.fs.Path(f.toString()), overwrite);
return new DistributedDataOutputStream(fdos);
return new HadoopDataOutputStream(fdos);
}
@Override
......@@ -352,7 +352,7 @@ public final class MapRFileSystem extends FileSystem {
// Convert types
for (int i = 0; i < files.length; i++) {
files[i] = new DistributedFileStatus(hadoopFiles[i]);
files[i] = new HadoopFileStatus(hadoopFiles[i]);
}
return files;
......
......@@ -307,17 +307,127 @@ under the License.
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- YARN -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
</exclusion>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
......@@ -328,11 +438,79 @@ under the License.
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-compiler</artifactId>
</exclusion>
<exclusion>
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-api-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jsp-2.1</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.jdt</groupId>
<artifactId>core</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</dependencyManagement>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册