提交 e9703e70 编写于 作者: A Andrey Zagrebin 提交者: kkloudas

[FLINK-10627][e2e] Test s3 output for streaming file sink.

This closes #6957.
上级 c58d37ed
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-end-to-end-tests</artifactId>
<groupId>org.apache.flink</groupId>
<version>1.8-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-e2e-test-utils</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.11.437</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>S3UtilProgram</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>S3UtilProgram</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.flink.streaming.tests.util.s3.S3UtilProgram</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.tests.util.s3;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.CSVInput;
import com.amazonaws.services.s3.model.CSVOutput;
import com.amazonaws.services.s3.model.CompressionType;
import com.amazonaws.services.s3.model.ExpressionType;
import com.amazonaws.services.s3.model.InputSerialization;
import com.amazonaws.services.s3.model.OutputSerialization;
import com.amazonaws.services.s3.model.SelectObjectContentEvent;
import com.amazonaws.services.s3.model.SelectObjectContentEventStream;
import com.amazonaws.services.s3.model.SelectObjectContentEventVisitor;
import com.amazonaws.services.s3.model.SelectObjectContentRequest;
import com.amazonaws.services.s3.model.SelectObjectContentResult;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicBoolean;
import static com.amazonaws.util.IOUtils.copy;
class S3QueryUtil {
/** Run SQL query over non-compressed CSV file saved in s3 object. */
static String queryFile(
AmazonS3 s3client, String bucket, String s3file, @SuppressWarnings("SameParameterValue") String query) {
SelectObjectContentRequest request = generateBaseCSVRequest(bucket, s3file, query);
final AtomicBoolean isResultComplete = new AtomicBoolean(false);
String res;
try (SelectObjectContentResult result = s3client.selectObjectContent(request);
SelectObjectContentEventStream payload = result.getPayload();
ByteArrayOutputStream out = new ByteArrayOutputStream()) {
InputStream resultInputStream = payload.getRecordsInputStream(
new SelectObjectContentEventVisitor() {
@Override
public void visit(SelectObjectContentEvent.EndEvent event) {
isResultComplete.set(true);
}
}
);
copy(resultInputStream, out);
res = out.toString().trim();
} catch (Throwable e) {
System.out.println("SQL query failure");
throw new RuntimeException("SQL query failure", e);
}
/*
* The End Event indicates all matching records have been transmitted.
* If the End Event is not received, the results may be incomplete.
*/
if (!isResultComplete.get()) {
throw new RuntimeException("S3 Select request was incomplete as End Event was not received.");
}
return res;
}
private static SelectObjectContentRequest generateBaseCSVRequest(String bucket, String key, String query) {
SelectObjectContentRequest request = new SelectObjectContentRequest();
request.setBucketName(bucket);
request.setKey(key);
request.setExpression(query);
request.setExpressionType(ExpressionType.SQL);
InputSerialization inputSerialization = new InputSerialization();
inputSerialization.setCsv(new CSVInput());
inputSerialization.setCompressionType(CompressionType.NONE);
request.setInputSerialization(inputSerialization);
OutputSerialization outputSerialization = new OutputSerialization();
outputSerialization.setCsv(new CSVOutput());
request.setOutputSerialization(outputSerialization);
return request;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.tests.util.s3;
import org.apache.flink.api.java.utils.ParameterTool;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.transfer.KeyFilter;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
/**
* S3 utilities.
*
* <p>Usage: java -jar S3UtilProgram.jar args.
*
* <p>Note: {@code S3UtilProgram.Action.lineNumber*} actions are applicable only
* to valid non-compressed CSV comma separated files.
*
* <p>Program parameters:
* <ul>
* <li>action (string, required): Action to perform, see {@link S3UtilProgram.Action}.</li>
* <li>bucket (string, required): Bucket where s3 objects reside.</li>
* <li>s3file (string, required for single object actions): s3 object key.</li>
* <li>s3prefix (string, required for actions over objects grouped by key prefix): s3 key prefix.</li>
* <li>s3filePrefix (string, optional for downloadByFullPathAndFileNamePrefix or numberOfLinesInFilesWithFullAndNamePrefix):
* s3 file name prefix w/o directory to filter files by name.</li>
* <li>localFile (string, required for single file actions): local file path.</li>
* <li>localFolder (string, required for actions over folders): local folder path.</li>
* <li>parallelism (int, default 10): parallelism for parallelizable actions
* (e.g. {@code numberOfLinesInFilesWithFullAndNamePrefix}).</li>
* </ul>
*/
class S3UtilProgram {
enum Action {
listByFullPathPrefix,
downloadFile,
downloadByFullPathAndFileNamePrefix,
deleteFile,
deleteByFullPathPrefix,
numberOfLinesInFile,
numberOfLinesInFilesWithFullAndNamePrefix
}
private static final Map<Action, Consumer<ParameterTool>> handlers;
static {
Map<Action, Consumer<ParameterTool>> handlersMutable = new HashMap<>();
handlersMutable.put(Action.listByFullPathPrefix, S3UtilProgram::listByFullPathPrefix);
handlersMutable.put(Action.downloadFile, S3UtilProgram::downloadFile);
handlersMutable.put(Action.downloadByFullPathAndFileNamePrefix,
S3UtilProgram::downloadByFullPathAndFileNamePrefix);
handlersMutable.put(Action.deleteFile, S3UtilProgram::deleteFile);
handlersMutable.put(Action.deleteByFullPathPrefix, S3UtilProgram::deleteByFullPathPrefix);
handlersMutable.put(Action.numberOfLinesInFile, S3UtilProgram::numberOfLinesInFile);
handlersMutable.put(Action.numberOfLinesInFilesWithFullAndNamePrefix,
S3UtilProgram::numberOfLinesInFilesWithFullAndNamePrefix);
handlers = Collections.unmodifiableMap(handlersMutable);
}
private static final String countQuery = "select count(*) from s3object";
public static void main(String[] args) {
final ParameterTool params = ParameterTool.fromArgs(args);
final Action action = Action.valueOf(params.getRequired("action"));
handlers.get(action).accept(params);
}
private static void listByFullPathPrefix(ParameterTool params) {
final String bucket = params.getRequired("bucket");
final String s3prefix = params.getRequired("s3prefix");
listByFullPathPrefix(bucket, s3prefix).forEach(System.out::println);
}
private static List<String> listByFullPathPrefix(final String bucket, final String s3prefix) {
return AmazonS3ClientBuilder.defaultClient().listObjects(bucket, s3prefix).getObjectSummaries()
.stream().map(S3ObjectSummary::getKey).collect(Collectors.toList());
}
private static void downloadFile(ParameterTool params) {
final String bucket = params.getRequired("bucket");
final String s3file = params.getRequired("s3file");
final String localFile = params.getRequired("localFile");
TransferManager tx = TransferManagerBuilder.defaultTransferManager();
try {
tx.download(bucket, s3file, new File(localFile)).waitForCompletion();
} catch (InterruptedException e) {
System.out.println("Transfer interrupted");
} finally {
tx.shutdownNow();
}
}
private static void downloadByFullPathAndFileNamePrefix(ParameterTool params) {
final String bucket = params.getRequired("bucket");
final String s3prefix = params.getRequired("s3prefix");
final String localFolder = params.getRequired("localFolder");
final String s3filePrefix = params.get("s3filePrefix", "");
TransferManager tx = TransferManagerBuilder.defaultTransferManager();
Predicate<String> keyPredicate = getKeyFilterByFileNamePrefix(s3filePrefix);
KeyFilter keyFilter = s3filePrefix.isEmpty() ? KeyFilter.INCLUDE_ALL :
objectSummary -> keyPredicate.test(objectSummary.getKey());
try {
tx.downloadDirectory(bucket, s3prefix, new File(localFolder), keyFilter).waitForCompletion();
} catch (InterruptedException e) {
System.out.println("Transfer interrupted");
} finally {
tx.shutdownNow();
}
}
private static Predicate<String> getKeyFilterByFileNamePrefix(String s3filePrefix) {
if (s3filePrefix.isEmpty()) {
return key -> true;
} else {
return key -> {
String[] parts = key.split("/");
String fileName = parts[parts.length - 1];
return fileName.startsWith(s3filePrefix);
};
}
}
private static void deleteFile(ParameterTool params) {
final String bucket = params.getRequired("bucket");
final String s3file = params.getRequired("s3file");
AmazonS3ClientBuilder.defaultClient().deleteObject(bucket, s3file);
}
private static void deleteByFullPathPrefix(ParameterTool params) {
final String bucket = params.getRequired("bucket");
final String s3prefix = params.getRequired("s3prefix");
String[] keys = listByFullPathPrefix(bucket, s3prefix).toArray(new String[] {});
if (keys.length > 0) {
DeleteObjectsRequest request = new DeleteObjectsRequest(bucket).withKeys(keys);
AmazonS3ClientBuilder.defaultClient().deleteObjects(request);
}
}
private static void numberOfLinesInFile(ParameterTool params) {
final String bucket = params.getRequired("bucket");
final String s3file = params.getRequired("s3file");
AmazonS3 s3client = AmazonS3ClientBuilder.defaultClient();
System.out.print(S3QueryUtil.queryFile(s3client, bucket, s3file, countQuery));
s3client.shutdown();
}
private static void numberOfLinesInFilesWithFullAndNamePrefix(ParameterTool params) {
final String bucket = params.getRequired("bucket");
final String s3prefix = params.getRequired("s3prefix");
final String s3filePrefix = params.get("s3filePrefix", "");
int parallelism = params.getInt("parallelism", 10);
List<String> files = listByFullPathPrefix(bucket, s3prefix);
ExecutorService executor = Executors.newFixedThreadPool(parallelism);
AmazonS3 s3client = AmazonS3ClientBuilder.defaultClient();
List<CompletableFuture<Integer>> requests =
submitLineCountingRequestsForFilesAsync(executor, s3client, bucket, files, s3filePrefix);
int count = waitAndComputeTotalLineCountResult(requests);
executor.shutdownNow();
s3client.shutdown();
System.out.print(count);
}
private static List<CompletableFuture<Integer>> submitLineCountingRequestsForFilesAsync(
ExecutorService executor, AmazonS3 s3client, String bucket, List<String> files, String s3filePrefix) {
List<CompletableFuture<Integer>> requests = new ArrayList<>();
Predicate<String> keyPredicate = getKeyFilterByFileNamePrefix(s3filePrefix);
files.forEach(file -> {
if (keyPredicate.test(file)) {
CompletableFuture<Integer> result = new CompletableFuture<>();
executor.execute(() ->
result.complete(Integer.parseInt(S3QueryUtil.queryFile(s3client, bucket, file, countQuery))));
requests.add(result);
}
});
return requests;
}
private static int waitAndComputeTotalLineCountResult(List<CompletableFuture<Integer>> requests) {
int count = 0;
for (CompletableFuture<Integer> result : requests) {
try {
count += result.get();
} catch (Throwable e) {
System.out.println("Failed count lines");
e.printStackTrace();
}
}
return count;
}
}
......@@ -55,6 +55,7 @@ under the License.
<module>flink-sql-client-test</module>
<module>flink-streaming-file-sink-test</module>
<module>flink-state-evolution-test</module>
<module>flink-e2e-test-utils</module>
</modules>
<build>
......
......@@ -101,6 +101,7 @@ run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_b
run_test "Streaming SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh" "skip_check_exceptions"
run_test "Streaming bucketing end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh" "skip_check_exceptions"
run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh" "skip_check_exceptions"
run_test "Streaming File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh s3" "skip_check_exceptions"
run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 file false false" "skip_check_exceptions"
......
......@@ -462,43 +462,6 @@ function check_result_hash {
fi
}
function s3_put {
local_file=$1
bucket=$2
s3_file=$3
resource="/${bucket}/${s3_file}"
contentType="application/octet-stream"
dateValue=`date -R`
stringToSign="PUT\n\n${contentType}\n${dateValue}\n${resource}"
s3Key=$ARTIFACTS_AWS_ACCESS_KEY
s3Secret=$ARTIFACTS_AWS_SECRET_KEY
signature=`echo -en ${stringToSign} | openssl sha1 -hmac ${s3Secret} -binary | base64`
curl -X PUT -T "${local_file}" \
-H "Host: ${bucket}.s3.amazonaws.com" \
-H "Date: ${dateValue}" \
-H "Content-Type: ${contentType}" \
-H "Authorization: AWS ${s3Key}:${signature}" \
https://${bucket}.s3.amazonaws.com/${s3_file}
}
function s3_delete {
bucket=$1
s3_file=$2
resource="/${bucket}/${s3_file}"
contentType="application/octet-stream"
dateValue=`date -R`
stringToSign="DELETE\n\n${contentType}\n${dateValue}\n${resource}"
s3Key=$ARTIFACTS_AWS_ACCESS_KEY
s3Secret=$ARTIFACTS_AWS_SECRET_KEY
signature=`echo -en ${stringToSign} | openssl sha1 -hmac ${s3Secret} -binary | base64`
curl -X DELETE \
-H "Host: ${bucket}.s3.amazonaws.com" \
-H "Date: ${dateValue}" \
-H "Content-Type: ${contentType}" \
-H "Authorization: AWS ${s3Key}:${signature}" \
https://${bucket}.s3.amazonaws.com/${s3_file}
}
# This function starts the given number of task managers and monitors their processes.
# If a task manager process goes away a replacement is started.
function tm_watchdog {
......@@ -692,3 +655,20 @@ function expect_in_taskmanager_logs {
fi
done
}
function wait_for_restart_to_complete {
local base_num_restarts=$1
local jobid=$2
local current_num_restarts=${base_num_restarts}
local expected_num_restarts=$((current_num_restarts + 1))
echo "Waiting for restart to happen"
while ! [[ ${current_num_restarts} -eq ${expected_num_restarts} ]]; do
sleep 5
current_num_restarts=$(get_job_metric ${jobid} "fullRestarts")
if [[ -z ${current_num_restarts} ]]; then
current_num_restarts=${base_num_restarts}
fi
done
}
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
if [[ -z "$ARTIFACTS_AWS_BUCKET" ]]; then
echo "Did not find AWS environment variables, NOT running the e2e test."
exit 0
else
echo "Found AWS bucket $ARTIFACTS_AWS_BUCKET, running the e2e test."
fi
if [[ -z "$ARTIFACTS_AWS_ACCESS_KEY" ]]; then
echo "Did not find AWS environment variables, NOT running the e2e test."
exit 0
else
echo "Found AWS access key $ARTIFACTS_AWS_ACCESS_KEY, running the e2e test."
fi
if [[ -z "$ARTIFACTS_AWS_SECRET_KEY" ]]; then
echo "Did not find AWS environment variables, NOT running the e2e test."
exit 0
else
echo "Found AWS secret key $ARTIFACTS_AWS_SECRET_KEY, running the e2e test."
fi
AWS_REGION="${AWS_REGION:-eu-west-1}"
AWS_ACCESS_KEY=$ARTIFACTS_AWS_ACCESS_KEY
AWS_SECRET_KEY=$ARTIFACTS_AWS_SECRET_KEY
s3util="java -jar ${END_TO_END_DIR}/flink-e2e-test-utils/target/S3UtilProgram.jar"
###################################
# Setup Flink s3 access.
#
# Globals:
# FLINK_DIR
# ARTIFACTS_AWS_ACCESS_KEY
# ARTIFACTS_AWS_SECRET_KEY
# Arguments:
# None
# Returns:
# None
###################################
function s3_setup {
# make sure we delete the file at the end
function s3_cleanup {
rm $FLINK_DIR/lib/flink-s3-fs*.jar
# remove any leftover settings
sed -i -e 's/s3.access-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
sed -i -e 's/s3.secret-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
}
trap s3_cleanup EXIT
cp $FLINK_DIR/opt/flink-s3-fs-hadoop-*.jar $FLINK_DIR/lib/
echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
echo "s3.secret-key: $ARTIFACTS_AWS_SECRET_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
}
s3_setup
###################################
# List s3 objects by full path prefix.
#
# Globals:
# ARTIFACTS_AWS_BUCKET
# Arguments:
# $1 - s3 full path key prefix
# Returns:
# List of s3 object keys, separated by newline
###################################
function s3_list {
AWS_REGION=$AWS_REGION \
${s3util} --action listByFullPathPrefix --s3prefix "$1" --bucket $ARTIFACTS_AWS_BUCKET
}
###################################
# Download s3 object.
#
# Globals:
# ARTIFACTS_AWS_BUCKET
# Arguments:
# $1 - local path to save file
# $2 - s3 object key
# Returns:
# None
###################################
function s3_get {
AWS_REGION=$AWS_REGION \
${s3util} --action downloadFile --localFile "$1" --s3file "$2" --bucket $ARTIFACTS_AWS_BUCKET
}
###################################
# Download s3 objects to folder by full path prefix.
#
# Globals:
# ARTIFACTS_AWS_BUCKET
# Arguments:
# $1 - local path to save folder with files
# $2 - s3 key full path prefix
# $3 - s3 file name prefix w/o directory to filter files by name (optional)
# Returns:
# None
###################################
function s3_get_by_full_path_and_filename_prefix {
local file_prefix="${3-}"
AWS_REGION=$AWS_REGION \
${s3util} --action downloadByFullPathAndFileNamePrefix \
--localFolder "$1" --s3prefix "$2" --s3filePrefix "${file_prefix}" --bucket $ARTIFACTS_AWS_BUCKET
}
###################################
# Upload file to s3 object.
#
# Globals:
# ARTIFACTS_AWS_BUCKET
# Arguments:
# $1 - local file to upload
# $2 - s3 bucket
# $3 - s3 object key
# Returns:
# None
###################################
function s3_put {
local_file=$1
bucket=$2
s3_file=$3
resource="/${bucket}/${s3_file}"
contentType="application/octet-stream"
dateValue=`date -R`
stringToSign="PUT\n\n${contentType}\n${dateValue}\n${resource}"
s3Key=$ARTIFACTS_AWS_ACCESS_KEY
s3Secret=$ARTIFACTS_AWS_SECRET_KEY
signature=`echo -en ${stringToSign} | openssl sha1 -hmac ${s3Secret} -binary | base64`
curl -X PUT -T "${local_file}" \
-H "Host: ${bucket}.s3.amazonaws.com" \
-H "Date: ${dateValue}" \
-H "Content-Type: ${contentType}" \
-H "Authorization: AWS ${s3Key}:${signature}" \
https://${bucket}.s3.amazonaws.com/${s3_file}
}
###################################
# Delete s3 object.
#
# Globals:
# None
# Arguments:
# $1 - s3 bucket
# $2 - s3 object key
# $3 - (optional) s3 host suffix
# Returns:
# None
###################################
function s3_delete {
bucket=$1
s3_file=$2
resource="/${bucket}/${s3_file}"
contentType="application/octet-stream"
dateValue=`date -R`
stringToSign="DELETE\n\n${contentType}\n${dateValue}\n${resource}"
s3Key=$ARTIFACTS_AWS_ACCESS_KEY
s3Secret=$ARTIFACTS_AWS_SECRET_KEY
signature=`echo -en ${stringToSign} | openssl sha1 -hmac ${s3Secret} -binary | base64`
curl -X DELETE \
-H "Host: ${bucket}.s3.amazonaws.com" \
-H "Date: ${dateValue}" \
-H "Content-Type: ${contentType}" \
-H "Authorization: AWS ${s3Key}:${signature}" \
https://${bucket}.s3.amazonaws.com/${s3_file}
}
###################################
# Delete s3 objects by full path prefix.
#
# Globals:
# ARTIFACTS_AWS_BUCKET
# Arguments:
# $1 - s3 key full path prefix
# Returns:
# None
###################################
function s3_delete_by_full_path_prefix {
AWS_REGION=$AWS_REGION \
${s3util} --action deleteByFullPathPrefix --s3prefix "$1" --bucket $ARTIFACTS_AWS_BUCKET
}
###################################
# Count number of lines in a file of s3 object.
# The lines has to be simple to comply with CSV format
# because SQL is used to query the s3 object.
#
# Globals:
# ARTIFACTS_AWS_BUCKET
# Arguments:
# $1 - s3 file object key
# $2 - s3 bucket
# Returns:
# None
###################################
function s3_get_number_of_lines_in_file {
AWS_REGION=$AWS_REGION \
${s3util} --action numberOfLinesInFile --s3file "$1" --bucket $ARTIFACTS_AWS_BUCKET
}
###################################
# Count number of lines in files of s3 objects filtered by prefix.
# The lines has to be simple to comply with CSV format
# because SQL is used to query the s3 objects.
#
# Globals:
# ARTIFACTS_AWS_BUCKET
# Arguments:
# $1 - s3 key prefix
# $2 - s3 bucket
# $3 - s3 file name prefix w/o directory to filter files by name (optional)
# Returns:
# None
###################################
function s3_get_number_of_lines_by_prefix {
local file_prefix="${3-}"
AWS_REGION=$AWS_REGION \
${s3util} --action numberOfLinesInFilesWithFullAndNamePrefix \
--s3prefix "$1" --s3filePrefix "${file_prefix}" --bucket $ARTIFACTS_AWS_BUCKET
}
......@@ -19,33 +19,18 @@
# Tests for our shaded/bundled Hadoop S3A file system.
if [[ -z "$ARTIFACTS_AWS_BUCKET" ]]; then
echo "Did not find AWS environment variables, NOT running Shaded Hadoop S3A e2e tests."
exit 0
else
echo "Found AWS bucket $ARTIFACTS_AWS_BUCKET, running Shaded Hadoop S3A e2e tests."
fi
source "$(dirname "$0")"/common.sh
source "$(dirname "$0")"/common_s3.sh
s3_put $TEST_INFRA_DIR/test-data/words $ARTIFACTS_AWS_BUCKET flink-end-to-end-test-shaded-s3a
# make sure we delete the file at the end
function s3_cleanup {
function shaded_s3a_cleanup {
s3_delete $ARTIFACTS_AWS_BUCKET flink-end-to-end-test-shaded-s3a
rm $FLINK_DIR/lib/flink-s3-fs*.jar
# remove any leftover settings
sed -i -e 's/s3.access-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
sed -i -e 's/s3.secret-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
}
trap s3_cleanup EXIT
cp $FLINK_DIR/opt/flink-s3-fs-hadoop-*.jar $FLINK_DIR/lib/
echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
echo "s3.secret-key: $ARTIFACTS_AWS_SECRET_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
trap shaded_s3a_cleanup EXIT
start_cluster
$FLINK_DIR/bin/flink run -p 1 $FLINK_DIR/examples/batch/WordCount.jar --input s3:/$resource --output $TEST_DATA_DIR/out/wc_out
check_result_hash "WordCountWithShadedS3A" $TEST_DATA_DIR/out/wc_out "72a690412be8928ba239c2da967328a5"
\ No newline at end of file
check_result_hash "WordCountWithShadedS3A" $TEST_DATA_DIR/out/wc_out "72a690412be8928ba239c2da967328a5"
......@@ -19,33 +19,18 @@
# Tests for our shaded/bundled Hadoop S3A file system.
if [[ -z "$ARTIFACTS_AWS_BUCKET" ]]; then
echo "Did not find AWS environment variables, NOT running Shaded Presto S3 e2e tests."
exit 0
else
echo "Found AWS bucket $ARTIFACTS_AWS_BUCKET, running Shaded Presto S3 e2e tests."
fi
source "$(dirname "$0")"/common.sh
source "$(dirname "$0")"/common_s3.sh
s3_put $TEST_INFRA_DIR/test-data/words $ARTIFACTS_AWS_BUCKET flink-end-to-end-test-shaded-presto-s3
# make sure we delete the file at the end
function s3_cleanup {
function shaded_presto_s3_cleanup {
s3_delete $ARTIFACTS_AWS_BUCKET flink-end-to-end-test-shaded-presto-s3
rm $FLINK_DIR/lib/flink-s3-fs*.jar
}
trap s3_cleanup EXIT
cp $FLINK_DIR/opt/flink-s3-fs-presto-*.jar $FLINK_DIR/lib/
echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
echo "s3.secret-key: $ARTIFACTS_AWS_SECRET_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml"
trap shaded_presto_s3_cleanup EXIT
start_cluster
$FLINK_DIR/bin/flink run -p 1 $FLINK_DIR/examples/batch/WordCount.jar --input s3:/$resource --output $TEST_DATA_DIR/out/wc_out
check_result_hash "WordCountWithShadedPrestoS3" $TEST_DATA_DIR/out/wc_out "72a690412be8928ba239c2da967328a5"
# remove any leftover settings
sed -i -e 's/s3.access-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
sed -i -e 's/s3.secret-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml"
......@@ -17,27 +17,37 @@
# limitations under the License.
################################################################################
source "$(dirname "$0")"/common.sh
TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-streaming-file-sink-test/target/StreamingFileSinkProgram.jar"
OUT_TYPE="${1:-local}" # other type: s3
OUTPUT_PATH="$TEST_DATA_DIR/out"
function wait_for_restart {
local base_num_restarts=$1
local current_num_restarts=${base_num_restarts}
local expected_num_restarts=$((current_num_restarts + 1))
source "$(dirname "$0")"/common.sh
source "$(dirname "$0")"/common_s3.sh
OUT=out
OUTPUT_PATH="$TEST_DATA_DIR/$OUT"
S3_OUTPUT_PATH="s3://$ARTIFACTS_AWS_BUCKET/$OUT"
mkdir -p $OUTPUT_PATH
if [ "${OUT_TYPE}" == "local" ]; then
echo "Use local output"
JOB_OUTPUT_PATH=${OUTPUT_PATH}
elif [ "${OUT_TYPE}" == "s3" ]; then
echo "Use s3 output"
JOB_OUTPUT_PATH=${S3_OUTPUT_PATH}
else
echo "Unknown output type: ${OUT_TYPE}"
exit 1
fi
echo "Waiting for restart to happen"
while ! [[ ${current_num_restarts} -eq ${expected_num_restarts} ]]; do
sleep 5
current_num_restarts=$(get_job_metric ${JOB_ID} "fullRestarts")
if [[ -z ${current_num_restarts} ]]; then
current_num_restarts=${base_num_restarts}
fi
done
# make sure we delete the file at the end
function out_cleanup {
s3_delete_by_full_path_prefix $OUT
}
if [ "${OUT_TYPE}" == "s3" ]; then
trap out_cleanup EXIT
fi
TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-streaming-file-sink-test/target/StreamingFileSinkProgram.jar"
###################################
# Get all lines in part files and sort them numerically.
......@@ -47,10 +57,32 @@ function wait_for_restart {
# Arguments:
# None
# Returns:
# None
# sorted content of part files
###################################
function get_complete_result {
find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
if [ "${OUT_TYPE}" == "s3" ]; then
rm -rf $OUTPUT_PATH; mkdir -p $OUTPUT_PATH
s3_get_by_full_path_and_filename_prefix ${TEST_DATA_DIR} "${OUT}" "part-"
fi
find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
}
###################################
# Get total number of lines in part files.
#
# Globals:
# OUT
# Arguments:
# None
# Returns:
# line number in part files
###################################
function get_total_number_of_valid_lines {
if [ "${OUT_TYPE}" == "local" ]; then
get_complete_result | wc -l | tr -d '[:space:]'
elif [ "${OUT_TYPE}" == "s3" ]; then
s3_get_number_of_lines_by_prefix "${OUT}" "part-"
fi
}
###################################
......@@ -83,7 +115,7 @@ function wait_for_complete_result {
sleep ${polling_interval}
((seconds_elapsed += ${polling_interval}))
number_of_values=$(get_complete_result | wc -l | tr -d '[:space:]')
number_of_values=$(get_total_number_of_valid_lines)
if [[ ${previous_number_of_values} -ne ${number_of_values} ]]; then
echo "Number of produced values ${number_of_values}/${expected_number_of_values}"
previous_number_of_values=${number_of_values}
......@@ -98,7 +130,7 @@ start_cluster
"${FLINK_DIR}/bin/taskmanager.sh" start
echo "Submitting job."
CLIENT_OUTPUT=$("$FLINK_DIR/bin/flink" run -d "${TEST_PROGRAM_JAR}" --outputPath "${OUTPUT_PATH}")
CLIENT_OUTPUT=$("$FLINK_DIR/bin/flink" run -d "${TEST_PROGRAM_JAR}" --outputPath "${JOB_OUTPUT_PATH}")
JOB_ID=$(echo "${CLIENT_OUTPUT}" | grep "Job has been submitted with JobID" | sed 's/.* //g')
if [[ -z $JOB_ID ]]; then
......@@ -117,7 +149,7 @@ kill_random_taskmanager
echo "Starting TM"
"$FLINK_DIR/bin/taskmanager.sh" start
wait_for_restart 0
wait_for_restart_to_complete 0 ${JOB_ID}
echo "Killing 2 TMs"
kill_random_taskmanager
......@@ -127,7 +159,7 @@ echo "Starting 2 TMs"
"$FLINK_DIR/bin/taskmanager.sh" start
"$FLINK_DIR/bin/taskmanager.sh" start
wait_for_restart 1
wait_for_restart_to_complete 1 ${JOB_ID}
echo "Waiting until all values have been produced"
wait_for_complete_result 60000 300
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册