From e00ec88601583d370e14d7d969b20ab1cbc6ce3e Mon Sep 17 00:00:00 2001 From: Piyush Narang Date: Thu, 4 Apr 2019 16:00:46 -0400 Subject: [PATCH] [FLINK-12115][fs] Add support for AzureFS Check for http enabled storage accounts in AzureFS IT tests Add AzureFS standalone E2E test --- docs/ops/filesystems/azure.md | 77 ++++++ docs/ops/filesystems/azure.zh.md | 77 ++++++ docs/ops/filesystems/index.md | 9 +- docs/ops/filesystems/index.zh.md | 7 +- flink-dist/src/main/assemblies/opt.xml | 7 + .../test-scripts/test_azure_fs.sh | 83 +++++++ .../flink-azure-fs-hadoop/pom.xml | 155 ++++++++++++ .../fs/azurefs/AbstractAzureFSFactory.java | 85 +++++++ .../flink/fs/azurefs/AzureFSFactory.java | 30 +++ .../fs/azurefs/SecureAzureFSFactory.java | 30 +++ ...org.apache.flink.core.fs.FileSystemFactory | 17 ++ .../flink/fs/azurefs/AzureFSFactoryTest.java | 94 ++++++++ .../AzureFileSystemBehaviorITCase.java | 220 ++++++++++++++++++ .../src/test/resources/log4j-test.properties | 27 +++ .../runtime/fs/hdfs/HadoopFileSystem.java | 4 +- .../runtime/util}/HadoopConfigLoader.java | 2 +- flink-filesystems/flink-s3-fs-base/pom.xml | 2 +- .../common/AbstractS3FileSystemFactory.java | 1 + .../fs/s3/common/S3EntropyFsFactoryTest.java | 1 + flink-filesystems/flink-s3-fs-hadoop/pom.xml | 5 + .../fs/s3hadoop/S3FileSystemFactory.java | 2 +- .../fs/s3hadoop/HadoopS3FileSystemTest.java | 2 +- flink-filesystems/flink-s3-fs-presto/pom.xml | 6 + .../fs/s3presto/S3FileSystemFactory.java | 2 +- .../fs/s3presto/PrestoS3FileSystemTest.java | 2 +- flink-filesystems/pom.xml | 1 + 26 files changed, 937 insertions(+), 11 deletions(-) create mode 100644 docs/ops/filesystems/azure.md create mode 100644 docs/ops/filesystems/azure.zh.md create mode 100755 flink-end-to-end-tests/test-scripts/test_azure_fs.sh create mode 100644 flink-filesystems/flink-azure-fs-hadoop/pom.xml create mode 100644 flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java create mode 100644 flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java create mode 100644 flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java create mode 100644 flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory create mode 100644 flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java create mode 100644 flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFileSystemBehaviorITCase.java create mode 100644 flink-filesystems/flink-azure-fs-hadoop/src/test/resources/log4j-test.properties rename flink-filesystems/{flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common => flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util}/HadoopConfigLoader.java (99%) diff --git a/docs/ops/filesystems/azure.md b/docs/ops/filesystems/azure.md new file mode 100644 index 00000000000..36720c80a11 --- /dev/null +++ b/docs/ops/filesystems/azure.md @@ -0,0 +1,77 @@ +--- +title: "Azure Blob Storage" +nav-title: Azure Blob Storage +nav-parent_id: filesystems +nav-pos: 3 +--- + + +[Azure Blob Storage](https://docs.microsoft.com/en-us/azure/storage/) is a Microsoft-managed service providing cloud storage for a variety of use cases. +You can use Azure Blob Storage with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{ site.baseurl }}/ops/state/state_backends.html) + +* This will be replaced by the TOC +{:toc} + +You can use Azure Blob Storage objects like regular files by specifying paths in the following format: + +{% highlight plain %} +wasb://@$.blob.core.windows.net/ + +// SSL encrypted access +wasbs://@$.blob.core.windows.net/ +{% endhighlight %} + +Below shows how to use Azure Blob Storage with Flink: + +{% highlight java %} +// Read from Azure Blob storage +env.readTextFile("wasb://@$.blob.core.windows.net/"); + +// Write to Azure Blob storage +stream.writeAsText("wasb://@$.blob.core.windows.net/") + +// Use Azure Blob Storage as FsStatebackend +env.setStateBackend(new FsStateBackend("wasb://@$.blob.core.windows.net/")); +{% endhighlight %} + +### Shaded Hadoop Azure Blob Storage file system + +To use `flink-azure-fs-hadoop,` copy the respective JAR file from the opt directory to the lib directory of your Flink distribution before starting Flink, e.g. + +{% highlight bash %} +cp ./opt/flink-azure-fs-hadoop-{{ site.version }}.jar ./lib/ +{% endhighlight %} + +`flink-azure-fs-hadoop` registers default FileSystem wrappers for URIs with the *wasb://* and *wasbs://* (SSL encrypted access) scheme. + +#### Configurations setup +After setting up the Azure Blob Storage FileSystem wrapper, you need to configure credentials to make sure that Flink is allowed to access Azure Blob Storage. + +To allow for easy adoption, you can use the same configuration keys in `flink-conf.yaml` as in Hadoop's `core-site.xml` + +You can see the configuration keys in the [Hadoop Azure Blob Storage documentation](https://hadoop.apache.org/docs/current/hadoop-azure/index.html#Configuring_Credentials). + +There are some required configurations that must be added to `flink-conf.yaml`: + +{% highlight yaml %} +fs.azure.account.key.youraccount.blob.core.windows.net: Azure Blob Storage access key +{% endhighlight %} + +{% top %} diff --git a/docs/ops/filesystems/azure.zh.md b/docs/ops/filesystems/azure.zh.md new file mode 100644 index 00000000000..36720c80a11 --- /dev/null +++ b/docs/ops/filesystems/azure.zh.md @@ -0,0 +1,77 @@ +--- +title: "Azure Blob Storage" +nav-title: Azure Blob Storage +nav-parent_id: filesystems +nav-pos: 3 +--- + + +[Azure Blob Storage](https://docs.microsoft.com/en-us/azure/storage/) is a Microsoft-managed service providing cloud storage for a variety of use cases. +You can use Azure Blob Storage with Flink for **reading** and **writing data** as well in conjunction with the [streaming **state backends**]({{ site.baseurl }}/ops/state/state_backends.html) + +* This will be replaced by the TOC +{:toc} + +You can use Azure Blob Storage objects like regular files by specifying paths in the following format: + +{% highlight plain %} +wasb://@$.blob.core.windows.net/ + +// SSL encrypted access +wasbs://@$.blob.core.windows.net/ +{% endhighlight %} + +Below shows how to use Azure Blob Storage with Flink: + +{% highlight java %} +// Read from Azure Blob storage +env.readTextFile("wasb://@$.blob.core.windows.net/"); + +// Write to Azure Blob storage +stream.writeAsText("wasb://@$.blob.core.windows.net/") + +// Use Azure Blob Storage as FsStatebackend +env.setStateBackend(new FsStateBackend("wasb://@$.blob.core.windows.net/")); +{% endhighlight %} + +### Shaded Hadoop Azure Blob Storage file system + +To use `flink-azure-fs-hadoop,` copy the respective JAR file from the opt directory to the lib directory of your Flink distribution before starting Flink, e.g. + +{% highlight bash %} +cp ./opt/flink-azure-fs-hadoop-{{ site.version }}.jar ./lib/ +{% endhighlight %} + +`flink-azure-fs-hadoop` registers default FileSystem wrappers for URIs with the *wasb://* and *wasbs://* (SSL encrypted access) scheme. + +#### Configurations setup +After setting up the Azure Blob Storage FileSystem wrapper, you need to configure credentials to make sure that Flink is allowed to access Azure Blob Storage. + +To allow for easy adoption, you can use the same configuration keys in `flink-conf.yaml` as in Hadoop's `core-site.xml` + +You can see the configuration keys in the [Hadoop Azure Blob Storage documentation](https://hadoop.apache.org/docs/current/hadoop-azure/index.html#Configuring_Credentials). + +There are some required configurations that must be added to `flink-conf.yaml`: + +{% highlight yaml %} +fs.azure.account.key.youraccount.blob.core.windows.net: Azure Blob Storage access key +{% endhighlight %} + +{% top %} diff --git a/docs/ops/filesystems/index.md b/docs/ops/filesystems/index.md index 0d4a1bebf4d..eb4087dd8ab 100644 --- a/docs/ops/filesystems/index.md +++ b/docs/ops/filesystems/index.md @@ -25,7 +25,7 @@ under the License. --> Apache Flink uses file systems to consume and persistently store data, both for the results of applications and for fault tolerance and recovery. -These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS* and *Aliyun OSS*. +These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS*, *Aliyun OSS* and *Azure Blob Storage*. The file system used for a particular file is determined by its URI scheme. For example, `file:///home/user/text.txt` refers to a file in the local file system, while `hdfs://namenode:50010/data/user/text.txt` is a file in a specific HDFS cluster. @@ -43,12 +43,17 @@ Flink ships with implementations for the following file systems: - **S3**: Flink directly provides file systems to talk to Amazon S3 with two alternative implementations, `flink-s3-fs-presto` and `flink-s3-fs-hadoop`. Both implementations are self-contained with no dependency footprint. - - **MapR FS**: The MapR file system *"maprfs://"* is automatically available when the MapR libraries are in the classpath. + - **MapR FS**: The MapR file system *"maprfs://"* is automatically available when the MapR libraries are in the classpath. - **OpenStack Swift FS**: Flink directly provides a file system to talk to the OpenStack Swift file system, registered under the scheme *"swift://"*. The implementation of `flink-swift-fs-hadoop` is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint. To use it when using Flink as a library, add the respective maven dependency (`org.apache.flink:flink-swift-fs-hadoop:{{ site.version }}` When starting a Flink application from the Flink binaries, copy or move the respective jar file from the `opt` folder to the `lib` folder. + + - **Azure Blob Storage**: + Flink directly provides a file system to work with Azure Blob Storage. + This filesystem is registered under the scheme *"wasb(s)://"*. + The implementation is self-contained with no dependency footprint. ## HDFS and Hadoop File System support diff --git a/docs/ops/filesystems/index.zh.md b/docs/ops/filesystems/index.zh.md index 0d4a1bebf4d..414c82f773e 100644 --- a/docs/ops/filesystems/index.zh.md +++ b/docs/ops/filesystems/index.zh.md @@ -25,7 +25,7 @@ under the License. --> Apache Flink uses file systems to consume and persistently store data, both for the results of applications and for fault tolerance and recovery. -These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS* and *Aliyun OSS*. +These are some of most of the popular file systems, including *local*, *hadoop-compatible*, *S3*, *MapR FS*, *OpenStack Swift FS*, *Aliyun OSS* and *Azure Blob Storage*. The file system used for a particular file is determined by its URI scheme. For example, `file:///home/user/text.txt` refers to a file in the local file system, while `hdfs://namenode:50010/data/user/text.txt` is a file in a specific HDFS cluster. @@ -49,6 +49,11 @@ Flink ships with implementations for the following file systems: The implementation of `flink-swift-fs-hadoop` is based on the [Hadoop Project](https://hadoop.apache.org/) but is self-contained with no dependency footprint. To use it when using Flink as a library, add the respective maven dependency (`org.apache.flink:flink-swift-fs-hadoop:{{ site.version }}` When starting a Flink application from the Flink binaries, copy or move the respective jar file from the `opt` folder to the `lib` folder. + + - **Azure Blob Storage**: + Flink directly provides a file system to work with Azure Blob Storage. + This filesystem is registered under the scheme *"wasb(s)://"*. + The implementation is self-contained with no dependency footprint. ## HDFS and Hadoop File System support diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml index 1ce6e8899e8..e28acd8c962 100644 --- a/flink-dist/src/main/assemblies/opt.xml +++ b/flink-dist/src/main/assemblies/opt.xml @@ -154,6 +154,13 @@ 0644 + + ../flink-filesystems/flink-azure-fs-hadoop/target/flink-azure-fs-hadoop-${project.version}.jar + opt/ + flink-azure-fs-hadoop-${project.version}.jar + 0644 + + ../flink-queryable-state/flink-queryable-state-runtime/target/flink-queryable-state-runtime_${scala.binary.version}-${project.version}.jar diff --git a/flink-end-to-end-tests/test-scripts/test_azure_fs.sh b/flink-end-to-end-tests/test-scripts/test_azure_fs.sh new file mode 100755 index 00000000000..40c6962d4a4 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_azure_fs.sh @@ -0,0 +1,83 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Tests for Azure file system. + +# To run single test, export IT_CASE_AZURE_ACCOUNT, IT_CASE_AZURE_ACCESS_KEY, IT_CASE_AZURE_CONTAINER to +# the appropriate values and run: +# flink-end-to-end-tests/run-single-test.sh skip flink-end-to-end-tests/test-scripts/test_azure_fs.sh + +source "$(dirname "$0")"/common.sh + +if [[ -z "$IT_CASE_AZURE_ACCOUNT" ]]; then + echo "Did not find Azure storage account environment variable, NOT running the e2e test." + exit 0 +else + echo "Found Azure storage account $IT_CASE_AZURE_ACCOUNT, running the e2e test." +fi + +if [[ -z "$IT_CASE_AZURE_ACCESS_KEY" ]]; then + echo "Did not find Azure storage access key environment variable, NOT running the e2e test." + exit 0 +else + echo "Found Azure storage access key $IT_CASE_AZURE_ACCESS_KEY, running the e2e test." +fi + +if [[ -z "$IT_CASE_AZURE_CONTAINER" ]]; then + echo "Did not find Azure storage container environment variable, NOT running the e2e test." + exit 0 +else + echo "Found Azure storage container $IT_CASE_AZURE_CONTAINER, running the e2e test." +fi + +AZURE_TEST_DATA_WORDS_URI="wasbs://$IT_CASE_AZURE_CONTAINER@$IT_CASE_AZURE_ACCOUNT.blob.core.windows.net/words" + +################################### +# Setup Flink Azure access. +# +# Globals: +# FLINK_DIR +# IT_CASE_AZURE_ACCOUNT +# IT_CASE_AZURE_ACCESS_KEY +# Returns: +# None +################################### +function azure_setup { + # make sure we delete the file at the end + function azure_cleanup { + rm $FLINK_DIR/lib/flink-azure-fs*.jar + + # remove any leftover settings + sed -i -e 's/fs.azure.account.key.*//' "$FLINK_DIR/conf/flink-conf.yaml" + } + trap azure_cleanup EXIT + + echo "Copying flink azure jars and writing out configs" + cp $FLINK_DIR/opt/flink-azure-fs-hadoop-*.jar $FLINK_DIR/lib/ + echo "fs.azure.account.key.$IT_CASE_AZURE_ACCOUNT.blob.core.windows.net: $IT_CASE_AZURE_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml" +} + +azure_setup + +echo "Starting Flink cluster.." +start_cluster + +$FLINK_DIR/bin/flink run -p 1 $FLINK_DIR/examples/batch/WordCount.jar --input $AZURE_TEST_DATA_WORDS_URI --output $TEST_DATA_DIR/out/wc_out + +check_result_hash "WordCountWithAzureFS" $TEST_DATA_DIR/out/wc_out "72a690412be8928ba239c2da967328a5" diff --git a/flink-filesystems/flink-azure-fs-hadoop/pom.xml b/flink-filesystems/flink-azure-fs-hadoop/pom.xml new file mode 100644 index 00000000000..37567ce633e --- /dev/null +++ b/flink-filesystems/flink-azure-fs-hadoop/pom.xml @@ -0,0 +1,155 @@ + + + + + 4.0.0 + + + org.apache.flink + flink-filesystems + 1.9-SNAPSHOT + .. + + + flink-azure-fs-hadoop + flink-azure-fs-hadoop + + jar + + + + 2.7.0 + 1.16.0 + 2.9.4 + + + + + + org.apache.flink + flink-core + ${project.version} + provided + + + + org.apache.flink + flink-hadoop-fs + ${project.version} + + + + org.apache.hadoop + hadoop-azure + ${fs.azure.version} + + + + + org.apache.hadoop + hadoop-hdfs + ${hadoop.version} + + + + + com.microsoft.azure + azure + ${fs.azure.sdk.version} + test + + + com.fasterxml.jackson.core + jackson-core + ${fs.jackson.core.version} + test + + + com.google.guava + guava + ${guava.version} + test + + + + + org.apache.flink + flink-core + ${project.version} + test + test-jar + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade-flink + package + + shade + + + false + + + *:* + + + + + org.apache.hadoop + org.apache.flink.fs.shaded.hadoop.org.apache.hadoop + + + + com.microsoft.azure.storage + org.apache.flink.fs.shaded.com.microsoft.azure.storage + + + + + * + + properties.dtd + PropertyList-1.0.dtd + mozilla/** + META-INF/maven/** + META-INF/LICENSE.txt + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + core-default.xml + hdfs-default.xml + + + + + + + + + + diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java new file mode 100644 index 00000000000..7ae9df8cc45 --- /dev/null +++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AbstractAzureFSFactory.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.azurefs; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemFactory; +import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem; +import org.apache.flink.runtime.util.HadoopConfigLoader; + +import org.apache.hadoop.fs.azure.NativeAzureFileSystem; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Abstract factory for AzureFS. Subclasses override to specify + * the correct scheme (wasb / wasbs). Based on Azure HDFS support in the + * hadoop-azure module. + */ +public abstract class AbstractAzureFSFactory implements FileSystemFactory { + private static final Logger LOG = LoggerFactory.getLogger(AzureFSFactory.class); + + private static final String[] FLINK_CONFIG_PREFIXES = { "fs.azure.", "azure." }; + private static final String HADOOP_CONFIG_PREFIX = "fs.azure."; + + private static final String[][] MIRRORED_CONFIG_KEYS = {}; + private static final Set PACKAGE_PREFIXES_TO_SHADE = Collections.emptySet(); + private static final Set CONFIG_KEYS_TO_SHADE = Collections.emptySet(); + private static final String FLINK_SHADING_PREFIX = ""; + + private final HadoopConfigLoader configLoader; + + private Configuration flinkConfig; + + public AbstractAzureFSFactory() { + this.configLoader = new HadoopConfigLoader(FLINK_CONFIG_PREFIXES, MIRRORED_CONFIG_KEYS, + HADOOP_CONFIG_PREFIX, PACKAGE_PREFIXES_TO_SHADE, CONFIG_KEYS_TO_SHADE, FLINK_SHADING_PREFIX); + } + + @Override + public void configure(Configuration config) { + flinkConfig = config; + configLoader.setFlinkConfig(config); + } + + @Override + public FileSystem create(URI fsUri) throws IOException { + checkNotNull(fsUri, "passed file system URI object should not be null"); + LOG.info("Trying to load and instantiate Azure File System"); + return new HadoopFileSystem(createInitializedAzureFS(fsUri, flinkConfig)); + } + + // uri is of the form: wasb(s)://yourcontainer@youraccount.blob.core.windows.net/testDir + private org.apache.hadoop.fs.FileSystem createInitializedAzureFS(URI fsUri, Configuration flinkConfig) throws IOException { + org.apache.hadoop.conf.Configuration hadoopConfig = configLoader.getOrLoadHadoopConfig(); + + org.apache.hadoop.fs.FileSystem azureFS = new NativeAzureFileSystem(); + azureFS.initialize(fsUri, hadoopConfig); + + return azureFS; + } +} diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java new file mode 100644 index 00000000000..5f6246d7f93 --- /dev/null +++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/AzureFSFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.azurefs; + +/** + * A factory for the Azure file system over HTTP. + */ +public class AzureFSFactory extends AbstractAzureFSFactory { + + @Override + public String getScheme() { + return "wasb"; + } +} diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java new file mode 100644 index 00000000000..7130a879c44 --- /dev/null +++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/java/org/apache/flink/fs/azurefs/SecureAzureFSFactory.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.azurefs; + +/** + * A factory for the Azure file system over HTTPs. + */ +public class SecureAzureFSFactory extends AbstractAzureFSFactory { + + @Override + public String getScheme() { + return "wasbs"; + } +} diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory new file mode 100644 index 00000000000..4d6a19aa54e --- /dev/null +++ b/flink-filesystems/flink-azure-fs-hadoop/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.fs.azurefs.AzureFSFactory +org.apache.flink.fs.azurefs.SecureAzureFSFactory diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java new file mode 100644 index 00000000000..01b79b5884f --- /dev/null +++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFSFactoryTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.azurefs; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.TestLogger; + +import org.apache.hadoop.fs.azure.AzureException; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.net.URI; +import java.util.Arrays; +import java.util.List; + +/** + * Tests for the AzureFSFactory. + */ +@RunWith(Parameterized.class) +public class AzureFSFactoryTest extends TestLogger { + + @Parameterized.Parameter + public String scheme; + + @Parameterized.Parameters(name = "Scheme = {0}") + public static List parameters() { + return Arrays.asList("wasb", "wasbs"); + } + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + private AbstractAzureFSFactory getFactory(String scheme) { + return scheme.equals("wasb") ? new AzureFSFactory() : new SecureAzureFSFactory(); + } + + @Test + public void testNullFsURI() throws Exception { + URI uri = null; + AbstractAzureFSFactory factory = getFactory(scheme); + + exception.expect(NullPointerException.class); + exception.expectMessage("passed file system URI object should not be null"); + + factory.create(uri); + } + + // missing credentials + @Test + public void testCreateFsWithAuthorityMissingCreds() throws Exception { + String uriString = String.format("%s://yourcontainer@youraccount.blob.core.windows.net/testDir", scheme); + final URI uri = URI.create(uriString); + + exception.expect(AzureException.class); + + AbstractAzureFSFactory factory = getFactory(scheme); + Configuration config = new Configuration(); + config.setInteger("fs.azure.io.retry.max.retries", 0); + factory.configure(config); + factory.create(uri); + } + + @Test + public void testCreateFsWithMissingAuthority() throws Exception { + String uriString = String.format("%s:///my/path", scheme); + final URI uri = URI.create(uriString); + + exception.expect(IllegalArgumentException.class); + exception.expectMessage("Cannot initialize WASB file system, URI authority not recognized."); + + AbstractAzureFSFactory factory = getFactory(scheme); + factory.configure(new Configuration()); + factory.create(uri); + } +} diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFileSystemBehaviorITCase.java b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFileSystemBehaviorITCase.java new file mode 100644 index 00000000000..6c65be90cbd --- /dev/null +++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureFileSystemBehaviorITCase.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.azurefs; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.FileSystemBehaviorTestSuite; +import org.apache.flink.core.fs.FileSystemKind; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.StringUtils; + +import com.microsoft.azure.credentials.ApplicationTokenCredentials; +import com.microsoft.azure.credentials.AzureTokenCredentials; +import com.microsoft.azure.management.Azure; +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +import static org.apache.flink.core.fs.FileSystemTestUtils.checkPathEventualExistence; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * An implementation of the {@link FileSystemBehaviorTestSuite} for Azure based + * file system. + */ +@RunWith(Parameterized.class) +public class AzureFileSystemBehaviorITCase extends FileSystemBehaviorTestSuite { + + @Parameterized.Parameter + public String scheme; + + private static final String CONTAINER = System.getenv("ARTIFACTS_AZURE_CONTAINER"); + private static final String ACCOUNT = System.getenv("ARTIFACTS_AZURE_STORAGE_ACCOUNT"); + private static final String ACCESS_KEY = System.getenv("ARTIFACTS_AZURE_ACCESS_KEY"); + private static final String RESOURCE_GROUP = System.getenv("ARTIFACTS_AZURE_RESOURCE_GROUP"); + private static final String SUBSCRIPTION_ID = System.getenv("ARTIFACTS_AZURE_SUBSCRIPTION_ID"); + private static final String TOKEN_CREDENTIALS_FILE = System.getenv("ARTIFACTS_AZURE_TOKEN_CREDENTIALS_FILE"); + + private static final String TEST_DATA_DIR = "tests-" + UUID.randomUUID(); + + // Azure Blob Storage defaults to https only storage accounts. We check if http support has been + // enabled on a best effort basis and test http if so. + @Parameterized.Parameters(name = "Scheme = {0}") + public static List parameters() throws IOException { + boolean httpsOnly = isHttpsTrafficOnly(); + return httpsOnly ? Arrays.asList("wasbs") : Arrays.asList("wasb", "wasbs"); + } + + private static boolean isHttpsTrafficOnly() throws IOException { + if (StringUtils.isNullOrWhitespaceOnly(RESOURCE_GROUP) || StringUtils.isNullOrWhitespaceOnly(TOKEN_CREDENTIALS_FILE)) { + // default to https only, as some fields are missing + return true; + } + + Assume.assumeTrue("Azure storage account not configured, skipping test...", !StringUtils.isNullOrWhitespaceOnly(ACCOUNT)); + + AzureTokenCredentials credentials = ApplicationTokenCredentials.fromFile(new File(TOKEN_CREDENTIALS_FILE)); + Azure azure = + StringUtils.isNullOrWhitespaceOnly(SUBSCRIPTION_ID) ? + Azure.authenticate(credentials).withDefaultSubscription() : + Azure.authenticate(credentials).withSubscription(SUBSCRIPTION_ID); + + return azure.storageAccounts().getByResourceGroup(RESOURCE_GROUP, ACCOUNT).inner().enableHttpsTrafficOnly(); + } + + @BeforeClass + public static void checkCredentialsAndSetup() throws IOException { + // check whether credentials and container details exist + Assume.assumeTrue("Azure container not configured, skipping test...", !StringUtils.isNullOrWhitespaceOnly(CONTAINER)); + Assume.assumeTrue("Azure access key not configured, skipping test...", !StringUtils.isNullOrWhitespaceOnly(ACCESS_KEY)); + + // initialize configuration with valid credentials + final Configuration conf = new Configuration(); + // fs.azure.account.key.youraccount.blob.core.windows.net = ACCESS_KEY + conf.setString("fs.azure.account.key." + ACCOUNT + ".blob.core.windows.net", ACCESS_KEY); + FileSystem.initialize(conf); + } + + @AfterClass + public static void clearFsConfig() throws IOException { + FileSystem.initialize(new Configuration()); + } + + @Override + public FileSystem getFileSystem() throws Exception { + return getBasePath().getFileSystem(); + } + + @Override + public Path getBasePath() { + // wasb(s)://yourcontainer@youraccount.blob.core.windows.net/testDataDir + String uriString = scheme + "://" + CONTAINER + '@' + ACCOUNT + ".blob.core.windows.net/" + TEST_DATA_DIR; + return new Path(uriString); + } + + @Test + public void testSimpleFileWriteAndRead() throws Exception { + final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs + + final String testLine = "Hello Upload!"; + + final Path path = new Path(getBasePath() + "/test.txt"); + final FileSystem fs = path.getFileSystem(); + + try { + try (FSDataOutputStream out = fs.create(path, FileSystem.WriteMode.OVERWRITE); + OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) { + writer.write(testLine); + } + + // just in case, wait for the path to exist + checkPathEventualExistence(fs, path, true, deadline); + + try (FSDataInputStream in = fs.open(path); + InputStreamReader ir = new InputStreamReader(in, StandardCharsets.UTF_8); + BufferedReader reader = new BufferedReader(ir)) { + String line = reader.readLine(); + assertEquals(testLine, line); + } + } + finally { + fs.delete(path, false); + } + + // now file must be gone + checkPathEventualExistence(fs, path, false, deadline); + } + + @Test + public void testDirectoryListing() throws Exception { + final long deadline = System.nanoTime() + 30_000_000_000L; // 30 secs + + final Path directory = new Path(getBasePath() + "/testdir/"); + final FileSystem fs = directory.getFileSystem(); + + // directory must not yet exist + assertFalse(fs.exists(directory)); + + try { + // create directory + assertTrue(fs.mkdirs(directory)); + + checkPathEventualExistence(fs, directory, true, deadline); + + // directory empty + assertEquals(0, fs.listStatus(directory).length); + + // create some files + final int numFiles = 3; + for (int i = 0; i < numFiles; i++) { + Path file = new Path(directory, "/file-" + i); + try (FSDataOutputStream out = fs.create(file, FileSystem.WriteMode.OVERWRITE); + OutputStreamWriter writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)) { + writer.write("hello-" + i + "\n"); + } + // just in case, wait for the file to exist (should then also be reflected in the + // directory's file list below) + checkPathEventualExistence(fs, file, true, deadline); + } + + FileStatus[] files = fs.listStatus(directory); + assertNotNull(files); + assertEquals(3, files.length); + + for (FileStatus status : files) { + assertFalse(status.isDir()); + } + + // now that there are files, the directory must exist + assertTrue(fs.exists(directory)); + } + finally { + // clean up + fs.delete(directory, true); + } + + // now directory must be gone + checkPathEventualExistence(fs, directory, false, deadline); + } + + @Override + public FileSystemKind getFileSystemKind() { + return FileSystemKind.OBJECT_STORE; + } +} diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/resources/log4j-test.properties b/flink-filesystems/flink-azure-fs-hadoop/src/test/resources/log4j-test.properties new file mode 100644 index 00000000000..2be35890d31 --- /dev/null +++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/resources/log4j-test.properties @@ -0,0 +1,27 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=OFF, testlogger + +# testlogger is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target = System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java index 6a5976aa9ad..1135e011bf4 100644 --- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java +++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java @@ -224,8 +224,8 @@ public class HadoopFileSystem extends FileSystem { static FileSystemKind getKindForScheme(String scheme) { scheme = scheme.toLowerCase(Locale.US); - if (scheme.startsWith("s3") || scheme.startsWith("emr") || scheme.startsWith("oss")) { - // the Amazon S3 storage or Aliyun OSS storage + if (scheme.startsWith("s3") || scheme.startsWith("emr") || scheme.startsWith("oss") || scheme.startsWith("wasb")) { + // the Amazon S3 storage or Aliyun OSS storage or Azure Blob Storage return FileSystemKind.OBJECT_STORE; } else if (scheme.startsWith("http") || scheme.startsWith("ftp")) { diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopConfigLoader.java similarity index 99% rename from flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java rename to flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopConfigLoader.java index 1bbb7574277..aa8fdfe6445 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/HadoopConfigLoader.java +++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopConfigLoader.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.fs.s3.common; +package org.apache.flink.runtime.util; import org.apache.flink.configuration.Configuration; diff --git a/flink-filesystems/flink-s3-fs-base/pom.xml b/flink-filesystems/flink-s3-fs-base/pom.xml index 0b640a4197e..00d40867293 100644 --- a/flink-filesystems/flink-s3-fs-base/pom.xml +++ b/flink-filesystems/flink-s3-fs-base/pom.xml @@ -166,7 +166,7 @@ under the License. org.apache.flink:flink-hadoop-fs - org/apache/flink/runtime/util/** + org/apache/flink/runtime/util/HadoopUtils org/apache/flink/runtime/fs/hdfs/HadoopRecoverable* diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java index ff575be6f55..a576a96ae9c 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/AbstractS3FileSystemFactory.java @@ -26,6 +26,7 @@ import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.FileSystemFactory; import org.apache.flink.fs.s3.common.writer.S3AccessHelper; +import org.apache.flink.runtime.util.HadoopConfigLoader; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java index 943de1d8897..ebf3b672b51 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java @@ -20,6 +20,7 @@ package org.apache.flink.fs.s3.common; import org.apache.flink.configuration.Configuration; import org.apache.flink.fs.s3.common.writer.S3AccessHelper; +import org.apache.flink.runtime.util.HadoopConfigLoader; import org.apache.flink.util.TestLogger; import org.apache.hadoop.fs.FileSystem; diff --git a/flink-filesystems/flink-s3-fs-hadoop/pom.xml b/flink-filesystems/flink-s3-fs-hadoop/pom.xml index 9a5a80c053d..e7cf95e232b 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/pom.xml +++ b/flink-filesystems/flink-s3-fs-hadoop/pom.xml @@ -106,6 +106,11 @@ under the License. com.amazon org.apache.flink.fs.s3base.shaded.com.amazon + + + org.apache.flink.runtime.util + org.apache.flink.fs.s3hadoop.common + diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java index 2637e7b2e23..6cad0511794 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-hadoop/src/main/java/org/apache/flink/fs/s3hadoop/S3FileSystemFactory.java @@ -20,8 +20,8 @@ package org.apache.flink.fs.s3hadoop; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory; -import org.apache.flink.fs.s3.common.HadoopConfigLoader; import org.apache.flink.fs.s3.common.writer.S3AccessHelper; +import org.apache.flink.runtime.util.HadoopConfigLoader; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.s3a.S3AFileSystem; diff --git a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java index 4471b3868df..57500f37ca7 100644 --- a/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java +++ b/flink-filesystems/flink-s3-fs-hadoop/src/test/java/org/apache/flink/fs/s3hadoop/HadoopS3FileSystemTest.java @@ -19,7 +19,7 @@ package org.apache.flink.fs.s3hadoop; import org.apache.flink.configuration.Configuration; -import org.apache.flink.fs.s3.common.HadoopConfigLoader; +import org.apache.flink.runtime.util.HadoopConfigLoader; import org.junit.Test; diff --git a/flink-filesystems/flink-s3-fs-presto/pom.xml b/flink-filesystems/flink-s3-fs-presto/pom.xml index 8f88bbfdac0..3fc8e03d124 100644 --- a/flink-filesystems/flink-s3-fs-presto/pom.xml +++ b/flink-filesystems/flink-s3-fs-presto/pom.xml @@ -290,6 +290,12 @@ under the License. com.google org.apache.flink.fs.s3presto.shaded.com.google + + + + org.apache.flink.runtime.util + org.apache.flink.fs.s3presto.common + diff --git a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java index c0c1beb6afe..5a1ffeef612 100644 --- a/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-presto/src/main/java/org/apache/flink/fs/s3presto/S3FileSystemFactory.java @@ -20,8 +20,8 @@ package org.apache.flink.fs.s3presto; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory; -import org.apache.flink.fs.s3.common.HadoopConfigLoader; import org.apache.flink.fs.s3.common.writer.S3AccessHelper; +import org.apache.flink.runtime.util.HadoopConfigLoader; import org.apache.flink.util.FlinkRuntimeException; import com.facebook.presto.hive.s3.PrestoS3FileSystem; diff --git a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java index 093efc8efab..f3117a277d6 100644 --- a/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java +++ b/flink-filesystems/flink-s3-fs-presto/src/test/java/org/apache/flink/fs/s3presto/PrestoS3FileSystemTest.java @@ -21,7 +21,7 @@ package org.apache.flink.fs.s3presto; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.fs.s3.common.FlinkS3FileSystem; -import org.apache.flink.fs.s3.common.HadoopConfigLoader; +import org.apache.flink.runtime.util.HadoopConfigLoader; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSStaticCredentialsProvider; diff --git a/flink-filesystems/pom.xml b/flink-filesystems/pom.xml index 8da2ceac34b..c84e8535b83 100644 --- a/flink-filesystems/pom.xml +++ b/flink-filesystems/pom.xml @@ -47,6 +47,7 @@ under the License. flink-s3-fs-presto flink-swift-fs-hadoop flink-oss-fs-hadoop + flink-azure-fs-hadoop -- GitLab