未验证 提交 e00ec886 编写于 作者: P Piyush Narang 提交者: Till Rohrmann

[FLINK-12115][fs] Add support for AzureFS

Check for http enabled storage accounts in AzureFS IT tests

Add AzureFS standalone E2E test
上级 6489237b
---
title: "Azure Blob Storage"
nav-title: Azure Blob Storage
nav-parent_id: filesystems
nav-pos: 3
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
[Azure Blob Storage](https://docs.microsoft.com/en-us/azure/storage/) is a Microsoft-managed service providing cloud storage for a variety of use cases.
You can use Azure Blob Storage with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{ site.baseurl }}/ops/state/state_backends.html)
* This will be replaced by the TOC
{:toc}
You can use Azure Blob Storage objects like regular files by specifying paths in the following format:
{% highlight plain %}
wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
// SSL encrypted access
wasbs://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
{% endhighlight %}
Below shows how to use Azure Blob Storage with Flink:
{% highlight java %}
// Read from Azure Blob storage
env.readTextFile("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>");
// Write to Azure Blob storage
stream.writeAsText("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>")
// Use Azure Blob Storage as FsStatebackend
env.setStateBackend(new FsStateBackend("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"));
{% endhighlight %}
### Shaded Hadoop Azure Blob Storage file system
To use `flink-azure-fs-hadoop,` copy the respective JAR file from the opt directory to the lib directory of your Flink distribution before starting Flink, e.g.
{% highlight bash %}
cp ./opt/flink-azure-fs-hadoop-{{ site.version }}.jar ./lib/
{% endhighlight %}
`flink-azure-fs-hadoop` registers default FileSystem wrappers for URIs with the *wasb://* and *wasbs://* (SSL encrypted access) scheme.
#### Configurations setup
After setting up the Azure Blob Storage FileSystem wrapper, you need to configure credentials to make sure that Flink is allowed to access Azure Blob Storage.
To allow for easy adoption, you can use the same configuration keys in `flink-conf.yaml` as in Hadoop's `core-site.xml`
You can see the configuration keys in the [Hadoop Azure Blob Storage documentation](https://hadoop.apache.org/docs/current/hadoop-azure/index.html#Configuring_Credentials).
There are some required configurations that must be added to `flink-conf.yaml`:
{% highlight yaml %}
fs.azure.account.key.youraccount.blob.core.windows.net: Azure Blob Storage access key
{% endhighlight %}
{% top %}
---
title: "Azure Blob Storage"
nav-title: Azure Blob Storage
nav-parent_id: filesystems
nav-pos: 3
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
[Azure Blob Storage](https://docs.microsoft.com/en-us/azure/storage/) is a Microsoft-managed service providing cloud storage for a variety of use cases.
You can use Azure Blob Storage with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{ site.baseurl }}/ops/state/state_backends.html)
* This will be replaced by the TOC
{:toc}
You can use Azure Blob Storage objects like regular files by specifying paths in the following format:
{% highlight plain %}
wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
// SSL encrypted access
wasbs://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>
{% endhighlight %}
Below shows how to use Azure Blob Storage with Flink:
{% highlight java %}
// Read from Azure Blob storage
env.readTextFile("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>");
// Write to Azure Blob storage
stream.writeAsText("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>")
// Use Azure Blob Storage as FsStatebackend
env.setStateBackend(new FsStateBackend("wasb://<your-container>@$<your-azure-account>.blob.core.windows.net/<object-path>"));
{% endhighlight %}
### Shaded Hadoop Azure Blob Storage file system
To use `flink-azure-fs-hadoop,` copy the respective JAR file from the opt directory to the lib directory of your Flink distribution before starting Flink, e.g.
{% highlight bash %}
cp ./opt/flink-azure-fs-hadoop-{{ site.version }}.jar ./lib/
{% endhighlight %}
`flink-azure-fs-hadoop` registers default FileSystem wrappers for URIs with the *wasb://* and *wasbs://* (SSL encrypted access) scheme.
#### Configurations setup
After setting up the Azure Blob Storage FileSystem wrapper, you need to configure credentials to make sure that Flink is allowed to access Azure Blob Storage.
To allow for easy adoption, you can use the same configuration keys in `flink-conf.yaml` as in Hadoop's `core-site.xml`
You can see the configuration keys in the [Hadoop Azure Blob Storage documentation](https://hadoop.apache.org/docs/current/hadoop-azure/index.html#Configuring_Credentials).
There are some required configurations that must be added to `flink-conf.yaml`:
{% highlight yaml %}
fs.azure.account.key.youraccount.blob.core.windows.net: Azure Blob Storage access key
{% endhighlight %}
{% top %}
......@@ -25,7 +25,7 @@ under the License.
-->
Apache Flink uses file systems to consume and persistently store data, both for the results of applications and for fault tolerance and recovery.
These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS* and *Aliyun OSS*.
These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS*, *Aliyun OSS* and *Azure Blob Storage*.
The file system used for a particular file is determined by its URI scheme.
For example, `file:///home/user/text.txt` refers to a file in the local file system, while `hdfs://namenode:50010/data/user/text.txt` is a file in a specific HDFS cluster.
......@@ -43,12 +43,17 @@ Flink ships with implementations for the following file systems:
- **S3**: Flink directly provides file systems to talk to Amazon S3 with two alternative implementations, `flink-s3-fs-presto` and `flink-s3-fs-hadoop`. Both implementations are self-contained with no dependency footprint.
- **MapR FS**: The MapR file system *"maprfs://"* is automatically available when the MapR libraries are in the classpath.
- **MapR FS**: The MapR file system *"maprfs://"* is automatically available when the MapR libraries are in the classpath.
- **OpenStack Swift FS**: Flink directly provides a file system to talk to the OpenStack Swift file system, registered under the scheme *"swift://"*.
The implementation of `flink-swift-fs-hadoop` is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint.
To use it when using Flink as a library, add the respective maven dependency (`org.apache.flink:flink-swift-fs-hadoop:{{ site.version }}`
When starting a Flink application from the Flink binaries, copy or move the respective jar file from the `opt` folder to the `lib` folder.
- **Azure Blob Storage**:
Flink directly provides a file system to work with Azure Blob Storage.
This filesystem is registered under the scheme *"wasb(s)://"*.
The implementation is self-contained with no dependency footprint.
## HDFS and Hadoop File System support
......
......@@ -25,7 +25,7 @@ under the License.
-->
Apache Flink uses file systems to consume and persistently store data, both for the results of applications and for fault tolerance and recovery.
These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS* and *Aliyun OSS*.
These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS*, *Aliyun OSS* and *Azure Blob Storage*.
The file system used for a particular file is determined by its URI scheme.
For example, `file:///home/user/text.txt` refers to a file in the local file system, while `hdfs://namenode:50010/data/user/text.txt` is a file in a specific HDFS cluster.
......@@ -49,6 +49,11 @@ Flink ships with implementations for the following file systems:
The implementation of `flink-swift-fs-hadoop` is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint.
To use it when using Flink as a library, add the respective maven dependency (`org.apache.flink:flink-swift-fs-hadoop:{{ site.version }}`
When starting a Flink application from the Flink binaries, copy or move the respective jar file from the `opt` folder to the `lib` folder.
- **Azure Blob Storage**:
Flink directly provides a file system to work with Azure Blob Storage.
This filesystem is registered under the scheme *"wasb(s)://"*.
The implementation is self-contained with no dependency footprint.
## HDFS and Hadoop File System support
......
......@@ -154,6 +154,13 @@
<fileMode>0644</fileMode>
</file>
<file>
<source>../flink-filesystems/flink-azure-fs-hadoop/target/flink-azure-fs-hadoop-${project.version}.jar</source>
<outputDirectory>opt/</outputDirectory>
<destName>flink-azure-fs-hadoop-${project.version}.jar</destName>
<fileMode>0644</fileMode>
</file>
<!-- Queryable State -->
<file>
<source>../flink-queryable-state/flink-queryable-state-runtime/target/flink-queryable-state-runtime_${scala.binary.version}-${project.version}.jar</source>
......
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Tests for Azure file system.
# To run single test, export IT_CASE_AZURE_ACCOUNT, IT_CASE_AZURE_ACCESS_KEY, IT_CASE_AZURE_CONTAINER to
# the appropriate values and run:
# flink-end-to-end-tests/run-single-test.sh skip flink-end-to-end-tests/test-scripts/test_azure_fs.sh
source "$(dirname "$0")"/common.sh
if [[ -z "$IT_CASE_AZURE_ACCOUNT" ]]; then
echo "Did not find Azure storage account environment variable, NOT running the e2e test."
exit 0
else
echo "Found Azure storage account $IT_CASE_AZURE_ACCOUNT, running the e2e test."
fi
if [[ -z "$IT_CASE_AZURE_ACCESS_KEY" ]]; then
echo "Did not find Azure storage access key environment variable, NOT running the e2e test."
exit 0
else
echo "Found Azure storage access key $IT_CASE_AZURE_ACCESS_KEY, running the e2e test."
fi
if [[ -z "$IT_CASE_AZURE_CONTAINER" ]]; then
echo "Did not find Azure storage container environment variable, NOT running the e2e test."
exit 0
else
echo "Found Azure storage container $IT_CASE_AZURE_CONTAINER, running the e2e test."
fi
AZURE_TEST_DATA_WORDS_URI="wasbs://$IT_CASE_AZURE_CONTAINER@$IT_CASE_AZURE_ACCOUNT.blob.core.windows.net/words"
###################################
# Setup Flink Azure access.
#
# Globals:
# FLINK_DIR
# IT_CASE_AZURE_ACCOUNT
# IT_CASE_AZURE_ACCESS_KEY
# Returns:
# None
###################################
function azure_setup {
# make sure we delete the file at the end
function azure_cleanup {
rm $FLINK_DIR/lib/flink-azure-fs*.jar
# remove any leftover settings
sed -i -e 's/fs.azure.account.key.*//' "$FLINK_DIR/conf/flink-conf.yaml"
}
trap azure_cleanup EXIT
echo "Copying flink azure jars and writing out configs"
cp $FLINK_DIR/opt/flink-azure-fs-hadoop-*.jar $FLINK_DIR/lib/
echo "fs.azure.account.key.$IT_CASE_AZURE_ACCOUNT.blob.core.windows.net: $IT_CASE_AZURE_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
}
azure_setup
echo "Starting Flink cluster.."
start_cluster
$FLINK_DIR/bin/flink run -p 1 $FLINK_DIR/examples/batch/WordCount.jar --input $AZURE_TEST_DATA_WORDS_URI --output $TEST_DATA_DIR/out/wc_out
check_result_hash "WordCountWithAzureFS" $TEST_DATA_DIR/out/wc_out "72a690412be8928ba239c2da967328a5"
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-filesystems</artifactId>
<version>1.9-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>flink-azure-fs-hadoop</artifactId>
<name>flink-azure-fs-hadoop</name>
<packaging>jar</packaging>
<!-- need to use a release which includes this patch: https://github.com/apache/hadoop/commit/02cadbd24bf69925078d044701741e2e3fcb4b2f -->
<properties>
<fs.azure.version>2.7.0</fs.azure.version>
<fs.azure.sdk.version>1.16.0</fs.azure.sdk.version>
<fs.jackson.core.version>2.9.4</fs.jackson.core.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-fs</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-azure</artifactId>
<version>${fs.azure.version}</version>
</dependency>
<!-- for the Azure HDFS related tests -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- for Azure IT tests to check if HTTP endpoints are enabled / not -->
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure</artifactId>
<version>${fs.azure.sdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${fs.jackson.core.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>test</scope>
</dependency>
<!-- for the behavior test suite -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Relocate all Azure related classes -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadeTestJar>false</shadeTestJar>
<artifactSet>
<includes>
<include>*:*</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>org.apache.hadoop</pattern>
<shadedPattern>org.apache.flink.fs.shaded.hadoop.org.apache.hadoop</shadedPattern>
</relocation>
<!-- relocate the azure-storage dependencies -->
<relocation>
<pattern>com.microsoft.azure.storage</pattern>
<shadedPattern>org.apache.flink.fs.shaded.com.microsoft.azure.storage</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
<artifact>*</artifact>
<excludes>
<exclude>properties.dtd</exclude>
<exclude>PropertyList-1.0.dtd</exclude>
<exclude>mozilla/**</exclude>
<exclude>META-INF/maven/**</exclude>
<exclude>META-INF/LICENSE.txt</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
<exclude>core-default.xml</exclude>
<exclude>hdfs-default.xml</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.fs.azurefs;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
import org.apache.flink.runtime.util.HadoopConfigLoader;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Set;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Abstract factory for AzureFS. Subclasses override to specify
* the correct scheme (wasb / wasbs). Based on Azure HDFS support in the
* <a href="https://hadoop.apache.org/docs/current/hadoop-azure/index.html">hadoop-azure</a> module.
*/
public abstract class AbstractAzureFSFactory implements FileSystemFactory {
private static final Logger LOG = LoggerFactory.getLogger(AzureFSFactory.class);
private static final String[] FLINK_CONFIG_PREFIXES = { "fs.azure.", "azure." };
private static final String HADOOP_CONFIG_PREFIX = "fs.azure.";
private static final String[][] MIRRORED_CONFIG_KEYS = {};
private static final Set<String> PACKAGE_PREFIXES_TO_SHADE = Collections.emptySet();
private static final Set<String> CONFIG_KEYS_TO_SHADE = Collections.emptySet();
private static final String FLINK_SHADING_PREFIX = "";
private final HadoopConfigLoader configLoader;
private Configuration flinkConfig;
public AbstractAzureFSFactory() {
this.configLoader = new HadoopConfigLoader(FLINK_CONFIG_PREFIXES, MIRRORED_CONFIG_KEYS,
HADOOP_CONFIG_PREFIX, PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX);
}
@Override
public void configure(Configuration config) {
flinkConfig = config;
configLoader.setFlinkConfig(config);
}
@Override
public FileSystem create(URI fsUri) throws IOException {
checkNotNull(fsUri, "passed file system URI object should not be null");
LOG.info("Trying to load and instantiate Azure File System");
return new HadoopFileSystem(createInitializedAzureFS(fsUri, flinkConfig));
}
// uri is of the form: wasb(s)://yourcontainer@youraccount.blob.core.windows.net/testDir
private org.apache.hadoop.fs.FileSystem createInitializedAzureFS(URI fsUri, Configuration flinkConfig) throws IOException {
org.apache.hadoop.conf.Configuration hadoopConfig = configLoader.getOrLoadHadoopConfig();
org.apache.hadoop.fs.FileSystem azureFS = new NativeAzureFileSystem();
azureFS.initialize(fsUri, hadoopConfig);
return azureFS;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.fs.azurefs;
/**
* A factory for the Azure file system over HTTP.
*/
public class AzureFSFactory extends AbstractAzureFSFactory {
@Override
public String getScheme() {
return "wasb";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.fs.azurefs;
/**
* A factory for the Azure file system over HTTPs.
*/
public class SecureAzureFSFactory extends AbstractAzureFSFactory {
@Override
public String getScheme() {
return "wasbs";
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.flink.fs.azurefs.AzureFSFactory
org.apache.flink.fs.azurefs.SecureAzureFSFactory
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.fs.azurefs;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.TestLogger;
import org.apache.hadoop.fs.azure.AzureException;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
/**
* Tests for the AzureFSFactory.
*/
@RunWith(Parameterized.class)
public class AzureFSFactoryTest extends TestLogger {
@Parameterized.Parameter
public String scheme;
@Parameterized.Parameters(name = "Scheme = {0}")
public static List<String> parameters() {
return Arrays.asList("wasb", "wasbs");
}
@Rule
public final ExpectedException exception = ExpectedException.none();
private AbstractAzureFSFactory getFactory(String scheme) {
return scheme.equals("wasb") ? new AzureFSFactory() : new SecureAzureFSFactory();
}
@Test
public void testNullFsURI() throws Exception {
URI uri = null;
AbstractAzureFSFactory factory = getFactory(scheme);
exception.expect(NullPointerException.class);
exception.expectMessage("passed file system URI object should not be null");
factory.create(uri);
}
// missing credentials
@Test
public void testCreateFsWithAuthorityMissingCreds() throws Exception {
String uriString = String.format("%s://yourcontainer@youraccount.blob.core.windows.net/testDir", scheme);
final URI uri = URI.create(uriString);
exception.expect(AzureException.class);
AbstractAzureFSFactory factory = getFactory(scheme);
Configuration config = new Configuration();
config.setInteger("fs.azure.io.retry.max.retries", 0);
factory.configure(config);
factory.create(uri);
}
@Test
public void testCreateFsWithMissingAuthority() throws Exception {
String uriString = String.format("%s:///my/path", scheme);
final URI uri = URI.create(uriString);
exception.expect(IllegalArgumentException.class);
exception.expectMessage("Cannot initialize WASB file system, URI authority not recognized.");
AbstractAzureFSFactory factory = getFactory(scheme);
factory.configure(new Configuration());
factory.create(uri);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.fs.azurefs;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemBehaviorTestSuite;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.StringUtils;
import com.microsoft.azure.credentials.ApplicationTokenCredentials;
import com.microsoft.azure.credentials.AzureTokenCredentials;
import com.microsoft.azure.management.Azure;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import static org.apache.flink.core.fs.FileSystemTestUtils.checkPathEventualExistence;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* An implementation of the {@link FileSystemBehaviorTestSuite} for Azure based
* file system.
*/
@RunWith(Parameterized.class)
public class AzureFileSystemBehaviorITCase extends FileSystemBehaviorTestSuite {
@Parameterized.Parameter
public String scheme;
private static final String CONTAINER = System.getenv("ARTIFACTS_AZURE_CONTAINER");
private static final String ACCOUNT = System.getenv("ARTIFACTS_AZURE_STORAGE_ACCOUNT");
private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AZURE_ACCESS_KEY");
private static final String RESOURCE_GROUP = System.getenv("ARTIFACTS_AZURE_RESOURCE_GROUP");
private static final String SUBSCRIPTION_ID = System.getenv("ARTIFACTS_AZURE_SUBSCRIPTION_ID");
private static final String TOKEN_CREDENTIALS_FILE = System.getenv("ARTIFACTS_AZURE_TOKEN_CREDENTIALS_FILE");
private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID();
// Azure Blob Storage defaults to https only storage accounts. We check if http support has been
// enabled on a best effort basis and test http if so.
@Parameterized.Parameters(name = "Scheme = {0}")
public static List<String> parameters() throws IOException {
boolean httpsOnly = isHttpsTrafficOnly();
return httpsOnly ? Arrays.asList("wasbs") : Arrays.asList("wasb", "wasbs");
}
private static boolean isHttpsTrafficOnly() throws IOException {
if (StringUtils.isNullOrWhitespaceOnly(RESOURCE_GROUP) || StringUtils.isNullOrWhitespaceOnly(TOKEN_CREDENTIALS_FILE)) {
// default to https only, as some fields are missing
return true;
}
Assume.assumeTrue("Azure storage account not configured, skipping test...", !StringUtils.isNullOrWhitespaceOnly(ACCOUNT));
AzureTokenCredentials credentials = ApplicationTokenCredentials.fromFile(new File(TOKEN_CREDENTIALS_FILE));
Azure azure =
StringUtils.isNullOrWhitespaceOnly(SUBSCRIPTION_ID) ?
Azure.authenticate(credentials).withDefaultSubscription() :
Azure.authenticate(credentials).withSubscription(SUBSCRIPTION_ID);
return azure.storageAccounts().getByResourceGroup(RESOURCE_GROUP, ACCOUNT).inner().enableHttpsTrafficOnly();
}
@BeforeClass
public static void checkCredentialsAndSetup() throws IOException {
// check whether credentials and container details exist
Assume.assumeTrue("Azure container not configured, skipping test...", !StringUtils.isNullOrWhitespaceOnly(CONTAINER));
Assume.assumeTrue("Azure access key not configured, skipping test...", !StringUtils.isNullOrWhitespaceOnly(ACCESS_KEY));
// initialize configuration with valid credentials
final Configuration conf = new Configuration();
// fs.azure.account.key.youraccount.blob.core.windows.net = ACCESS_KEY
conf.setString("fs.azure.account.key." + ACCOUNT + ".blob.core.windows.net", ACCESS_KEY);
FileSystem.initialize(conf);
}
@AfterClass
public static void clearFsConfig() throws IOException {
FileSystem.initialize(new Configuration());
}
@Override
public FileSystem getFileSystem() throws Exception {
return getBasePath().getFileSystem();
}
@Override
public Path getBasePath() {
// wasb(s)://yourcontainer@youraccount.blob.core.windows.net/testDataDir
String uriString = scheme + "://" + CONTAINER + '@' + ACCOUNT + ".blob.core.windows.net/" + TEST_DATA_DIR;
return new Path(uriString);
}
@Test
public void testSimpleFileWriteAndRead() throws Exception {
final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
final String testLine = "Hello Upload!";
final Path path = new Path(getBasePath() + "/test.txt");
final FileSystem fs = path.getFileSystem();
try {
try (FSDataOutputStream out = fs.create(path, FileSystem.WriteMode.OVERWRITE);
OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) {
writer.write(testLine);
}
// just in case, wait for the path to exist
checkPathEventualExistence(fs, path, true, deadline);
try (FSDataInputStream in = fs.open(path);
InputStreamReader ir = new InputStreamReader(in, StandardCharsets.UTF_8);
BufferedReader reader = new BufferedReader(ir)) {
String line = reader.readLine();
assertEquals(testLine, line);
}
}
finally {
fs.delete(path, false);
}
// now file must be gone
checkPathEventualExistence(fs, path, false, deadline);
}
@Test
public void testDirectoryListing() throws Exception {
final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs
final Path directory = new Path(getBasePath() + "/testdir/");
final FileSystem fs = directory.getFileSystem();
// directory must not yet exist
assertFalse(fs.exists(directory));
try {
// create directory
assertTrue(fs.mkdirs(directory));
checkPathEventualExistence(fs, directory, true, deadline);
// directory empty
assertEquals(0, fs.listStatus(directory).length);
// create some files
final int numFiles = 3;
for (int i = 0; i < numFiles; i++) {
Path file = new Path(directory, "/file-" + i);
try (FSDataOutputStream out = fs.create(file, FileSystem.WriteMode.OVERWRITE);
OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) {
writer.write("hello-" + i + "\n");
}
// just in case, wait for the file to exist (should then also be reflected in the
// directory's file list below)
checkPathEventualExistence(fs, file, true, deadline);
}
FileStatus[] files = fs.listStatus(directory);
assertNotNull(files);
assertEquals(3, files.length);
for (FileStatus status : files) {
assertFalse(status.isDir());
}
// now that there are files, the directory must exist
assertTrue(fs.exists(directory));
}
finally {
// clean up
fs.delete(directory, true);
}
// now directory must be gone
checkPathEventualExistence(fs, directory, false, deadline);
}
@Override
public FileSystemKind getFileSystemKind() {
return FileSystemKind.OBJECT_STORE;
}
}
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
log4j.rootLogger=OFF, testlogger
# testlogger is set to be a ConsoleAppender.
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target = System.err
log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
......@@ -224,8 +224,8 @@ public class HadoopFileSystem extends FileSystem {
static FileSystemKind getKindForScheme(String scheme) {
scheme = scheme.toLowerCase(Locale.US);
if (scheme.startsWith("s3") || scheme.startsWith("emr") || scheme.startsWith("oss")) {
// the Amazon S3 storage or Aliyun OSS storage
if (scheme.startsWith("s3") || scheme.startsWith("emr") || scheme.startsWith("oss") || scheme.startsWith("wasb")) {
// the Amazon S3 storage or Aliyun OSS storage or Azure Blob Storage
return FileSystemKind.OBJECT_STORE;
}
else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.fs.s3.common;
package org.apache.flink.runtime.util;
import org.apache.flink.configuration.Configuration;
......
......@@ -166,7 +166,7 @@ under the License.
<filter>
<artifact>org.apache.flink:flink-hadoop-fs</artifact>
<excludes>
<exclude>org/apache/flink/runtime/util/**</exclude>
<exclude>org/apache/flink/runtime/util/HadoopUtils</exclude>
<exclude>org/apache/flink/runtime/fs/hdfs/HadoopRecoverable*</exclude>
</excludes>
</filter>
......
......@@ -26,6 +26,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.flink.runtime.util.HadoopConfigLoader;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
......
......@@ -20,6 +20,7 @@ package org.apache.flink.fs.s3.common;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.flink.runtime.util.HadoopConfigLoader;
import org.apache.flink.util.TestLogger;
import org.apache.hadoop.fs.FileSystem;
......
......@@ -106,6 +106,11 @@ under the License.
<pattern>com.amazon</pattern>
<shadedPattern>org.apache.flink.fs.s3base.shaded.com.amazon</shadedPattern>
</relocation>
<!-- shade Flink's Hadoop FS utility classes -->
<relocation>
<pattern>org.apache.flink.runtime.util</pattern>
<shadedPattern>org.apache.flink.fs.s3hadoop.common</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
......
......@@ -20,8 +20,8 @@ package org.apache.flink.fs.s3hadoop;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
import org.apache.flink.fs.s3.common.HadoopConfigLoader;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.flink.runtime.util.HadoopConfigLoader;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
......
......@@ -19,7 +19,7 @@
package org.apache.flink.fs.s3hadoop;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fs.s3.common.HadoopConfigLoader;
import org.apache.flink.runtime.util.HadoopConfigLoader;
import org.junit.Test;
......
......@@ -290,6 +290,12 @@ under the License.
<pattern>com.google</pattern>
<shadedPattern>org.apache.flink.fs.s3presto.shaded.com.google</shadedPattern>
</relocation>
<!-- shade Flink's Hadoop FS utility classes -->
<relocation>
<pattern>org.apache.flink.runtime.util</pattern>
<shadedPattern>org.apache.flink.fs.s3presto.common</shadedPattern>
</relocation>
</relocations>
<filters>
<filter>
......
......@@ -20,8 +20,8 @@ package org.apache.flink.fs.s3presto;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory;
import org.apache.flink.fs.s3.common.HadoopConfigLoader;
import org.apache.flink.fs.s3.common.writer.S3AccessHelper;
import org.apache.flink.runtime.util.HadoopConfigLoader;
import org.apache.flink.util.FlinkRuntimeException;
import com.facebook.presto.hive.s3.PrestoS3FileSystem;
......
......@@ -21,7 +21,7 @@ package org.apache.flink.fs.s3presto;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.fs.s3.common.FlinkS3FileSystem;
import org.apache.flink.fs.s3.common.HadoopConfigLoader;
import org.apache.flink.runtime.util.HadoopConfigLoader;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
......
......@@ -47,6 +47,7 @@ under the License.
<module>flink-s3-fs-presto</module>
<module>flink-swift-fs-hadoop</module>
<module>flink-oss-fs-hadoop</module>
<module>flink-azure-fs-hadoop</module>
</modules>
<!-- Common dependency setup for all filesystems -->
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册