提交 ace25c88 编写于 作者: U Ufuk Celebi

[docs] Fix broken links and remove duplicate page

上级 c4e83c11
......@@ -110,7 +110,7 @@ The command line can be used to
### Savepoints
[Savepoints]({{site.baseurl}}/apis/savepoints.html) are controlled via the command line client:
[Savepoints]({{site.baseurl}}/apis/streaming/savepoints.html) are controlled via the command line client:
#### Trigger a savepoint
......
......@@ -103,7 +103,7 @@ One implementation of these `InputFormat`s is the `HadoopInputFormat`. This is a
users to use all existing Hadoop input formats with Flink.
This section shows some examples for connecting Flink to other systems.
[Read more about Hadoop compatibility in Flink](hadoop_compatibility.html).
[Read more about Hadoop compatibility in Flink]({{ site.baseurl }}/apis/batch/hadoop_compatibility.html).
## Avro support in Flink
......
---
title: "File Systems"
# Top-level navigation
top-nav-group: apis
top-nav-pos: 9
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
## Reading from file systems.
Flink has build-in support for the following file systems:
| Filesystem | Scheme | Notes |
| ------------------------------------- |--------------| ------ |
| Hadoop Distributed File System (HDFS) &nbsp; | `hdfs://` | All HDFS versions are supported |
| Amazon S3 | `s3://` | Support through Hadoop file system implementation (see below) |
| MapR file system | `maprfs://` | The user has to manually place the required jar files in the `lib/` dir |
| Tachyon | `tachyon://` &nbsp; | Support through Hadoop file system implementation (see below) |
### Using Hadoop file system implementations
Apache Flink allows users to use any file system implementing the `org.apache.hadoop.fs.FileSystem`
interface. There are Hadoop `FileSystem` implementations for
- [S3](https://aws.amazon.com/s3/) (tested)
- [Google Cloud Storage Connector for Hadoop](https://cloud.google.com/hadoop/google-cloud-storage-connector) (tested)
- [Tachyon](http://tachyon-project.org/) (tested)
- [XtreemFS](http://www.xtreemfs.org/) (tested)
- FTP via [Hftp](http://hadoop.apache.org/docs/r1.2.1/hftp.html) (not tested)
- and many more.
In order to use a Hadoop file system with Flink, make sure that
- the `flink-conf.yaml` has set the `fs.hdfs.hadoopconf` property set to the Hadoop configuration directory.
- the Hadoop configuration (in that directory) has an entry for the required file system. Examples for S3 and Tachyon are shown below.
- the required classes for using the file system are available in the `lib/` folder of the Flink installation (on all machines running Flink). If putting the files into the directory is not possible, Flink is also respecting the `HADOOP_CLASSPATH` environment variable to add Hadoop jar files to the classpath.
#### Amazon S3
For Amazon S3 support add the following entries into the `core-site.xml` file:
~~~xml
<!-- configure the file system implementation -->
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3native.NativeS3FileSystem</value>
</property>
<!-- set your AWS ID -->
<property>
<name>fs.s3.awsAccessKeyId</name>
<value>putKeyHere</value>
</property>
<!-- set your AWS access key -->
<property>
<name>fs.s3.awsSecretAccessKey</name>
<value>putSecretHere</value>
</property>
~~~
#### Tachyon
For Tachyon support add the following entry into the `core-site.xml` file:
~~~xml
<property>
<name>fs.tachyon.impl</name>
<value>tachyon.hadoop.TFS</value>
</property>
~~~
## Connecting to other systems using Input/OutputFormat wrappers for Hadoop
Apache Flink allows users to access many different systems as data sources or sinks.
The system is designed for very easy extensibility. Similar to Apache Hadoop, Flink has the concept
of so called `InputFormat`s and `OutputFormat`s.
One implementation of these `InputFormat`s is the `HadoopInputFormat`. This is a wrapper that allows
users to use all existing Hadoop input formats with Flink.
This section shows some examples for connecting Flink to other systems.
[Read more about Hadoop compatibility in Flink]({{site.baseurl}}/apis/batch/hadoop_compatibility.html).
## Avro support in Flink
Flink has extensive build-in support for [Apache Avro](http://avro.apache.org/). This allows to easily read from Avro files with Flink.
Also, the serialization framework of Flink is able to handle classes generated from Avro schemas.
In order to read data from an Avro file, you have to specify an `AvroInputFormat`.
**Example**:
~~~java
AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
DataSet<User> usersDS = env.createInput(users);
~~~
Note that `User` is a POJO generated by Avro. Flink also allows to perform string-based key selection of these POJOs. For example:
~~~java
usersDS.groupBy("name")
~~~
Note that using the `GenericData.Record` type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use.
Flink's POJO field selection also works with POJOs generated from Avro. However, the usage is only possible if the field types are written correctly to the generated class. If a field is of type `Object` you can not use the field as a join or grouping key.
Specifying a field in Avro like this `{"name": "type_double_test", "type": "double"},` works fine, however specifying it as a UNION-type with only one field (`{"name": "type_double_test", "type": ["double"]},`) will generate a field of type `Object`. Note that specifying nullable types (`{"name": "type_double_test", "type": ["null", "double"]},`) is possible!
### Access Microsoft Azure Table Storage
_Note: This example works starting from Flink 0.6-incubating_
This example is using the `HadoopInputFormat` wrapper to use an existing Hadoop input format implementation for accessing [Azure's Table Storage](https://azure.microsoft.com/en-us/documentation/articles/storage-introduction/).
1. Download and compile the `azure-tables-hadoop` project. The input format developed by the project is not yet available in Maven Central, therefore, we have to build the project ourselves.
Execute the following commands:
~~~bash
git clone https://github.com/mooso/azure-tables-hadoop.git
cd azure-tables-hadoop
mvn clean install
~~~
2. Setup a new Flink project using the quickstarts:
~~~bash
curl https://flink.apache.org/q/quickstart.sh | bash
~~~
3. Add the following dependencies (in the `<dependencies>` section) to your `pom.xml` file:
~~~xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility{{ site.scala_version_suffix }}</artifactId>
<version>{{site.version}}</version>
</dependency>
<dependency>
<groupId>com.microsoft.hadoop</groupId>
<artifactId>microsoft-hadoop-azure</artifactId>
<version>0.0.4</version>
</dependency>
~~~
`flink-hadoop-compatibility` is a Flink package that provides the Hadoop input format wrappers.
`microsoft-hadoop-azure` is adding the project we've build before to our project.
The project is now prepared for starting to code. We recommend to import the project into an IDE, such as Eclipse or IntelliJ. (Import as a Maven project!).
Browse to the code of the `Job.java` file. Its an empty skeleton for a Flink job.
Paste the following code into it:
~~~java
import java.util.Map;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.mapreduce.HadoopInputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import com.microsoft.hadoop.azure.AzureTableConfiguration;
import com.microsoft.hadoop.azure.AzureTableInputFormat;
import com.microsoft.hadoop.azure.WritableEntity;
import com.microsoft.windowsazure.storage.table.EntityProperty;
public class AzureTableExample {
public static void main(String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// create a AzureTableInputFormat, using a Hadoop input format wrapper
HadoopInputFormat<Text, WritableEntity> hdIf = new HadoopInputFormat<Text, WritableEntity>(new AzureTableInputFormat(), Text.class, WritableEntity.class, new Job());
// set the Account URI, something like: https://apacheflink.table.core.windows.net
hdIf.getConfiguration().set(AzureTableConfiguration.Keys.ACCOUNT_URI.getKey(), "TODO");
// set the secret storage key here
hdIf.getConfiguration().set(AzureTableConfiguration.Keys.STORAGE_KEY.getKey(), "TODO");
// set the table name here
hdIf.getConfiguration().set(AzureTableConfiguration.Keys.TABLE_NAME.getKey(), "TODO");
DataSet<Tuple2<Text, WritableEntity>> input = env.createInput(hdIf);
// a little example how to use the data in a mapper.
DataSet<String> fin = input.map(new MapFunction<Tuple2<Text,WritableEntity>, String>() {
@Override
public String map(Tuple2<Text, WritableEntity> arg0) throws Exception {
System.err.println("--------------------------------\nKey = "+arg0.f0);
WritableEntity we = arg0.f1;
for(Map.Entry<String, EntityProperty> prop : we.getProperties().entrySet()) {
System.err.println("key="+prop.getKey() + " ; value (asString)="+prop.getValue().getValueAsString());
}
return arg0.f0.toString();
}
});
// emit result (this works only locally)
fin.print();
// execute program
env.execute("Azure Example");
}
}
~~~
The example shows how to access an Azure table and turn data into Flink's `DataSet` (more specifically, the type of the set is `DataSet<Tuple2<Text, WritableEntity>>`). With the `DataSet`, you can apply all known transformations to the DataSet.
## Access MongoDB
This [GitHub repository documents how to use MongoDB with Apache Flink (starting from 0.7-incubating)](https://github.com/okkam-it/flink-mongodb-test).
......@@ -28,7 +28,7 @@ Flink comes with an integrated interactive Scala Shell.
It can be used in a local setup as well as in a cluster setup. To get started with downloading
Flink and setting up a cluster please refer to
[local setup]({{ site.baseurl }}/setup/local_setup.html) or
[cluster setup]({{ site.baseurl }}/setup/cluster.html)
[cluster setup]({{ site.baseurl }}/setup/cluster_setup.html)
To use the shell with an integrated Flink cluster just execute:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册