diff --git a/flink-end-to-end-tests/flink-e2e-test-utils/pom.xml b/flink-end-to-end-tests/flink-e2e-test-utils/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..67988b3ce6ba93728d57105851ad805bf663c946 --- /dev/null +++ b/flink-end-to-end-tests/flink-e2e-test-utils/pom.xml @@ -0,0 +1,71 @@ + + + + + flink-end-to-end-tests + org.apache.flink + 1.8-SNAPSHOT + + 4.0.0 + + flink-e2e-test-utils + + + + org.apache.flink + flink-streaming-java_${scala.binary.version} + ${project.version} + + + com.amazonaws + aws-java-sdk-s3 + 1.11.437 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + S3UtilProgram + package + + shade + + + S3UtilProgram + + + org.apache.flink.streaming.tests.util.s3.S3UtilProgram + + + + + + + + + + diff --git a/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3QueryUtil.java b/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3QueryUtil.java new file mode 100644 index 0000000000000000000000000000000000000000..781a8edd20aca388ba4079e879613da5d813eb27 --- /dev/null +++ b/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3QueryUtil.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.util.s3; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.CSVInput; +import com.amazonaws.services.s3.model.CSVOutput; +import com.amazonaws.services.s3.model.CompressionType; +import com.amazonaws.services.s3.model.ExpressionType; +import com.amazonaws.services.s3.model.InputSerialization; +import com.amazonaws.services.s3.model.OutputSerialization; +import com.amazonaws.services.s3.model.SelectObjectContentEvent; +import com.amazonaws.services.s3.model.SelectObjectContentEventStream; +import com.amazonaws.services.s3.model.SelectObjectContentEventVisitor; +import com.amazonaws.services.s3.model.SelectObjectContentRequest; +import com.amazonaws.services.s3.model.SelectObjectContentResult; + +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.amazonaws.util.IOUtils.copy; + +class S3QueryUtil { + /** Run SQL query over non-compressed CSV file saved in s3 object. */ + static String queryFile( + AmazonS3 s3client, String bucket, String s3file, @SuppressWarnings("SameParameterValue") String query) { + SelectObjectContentRequest request = generateBaseCSVRequest(bucket, s3file, query); + final AtomicBoolean isResultComplete = new AtomicBoolean(false); + String res; + try (SelectObjectContentResult result = s3client.selectObjectContent(request); + SelectObjectContentEventStream payload = result.getPayload(); + ByteArrayOutputStream out = new ByteArrayOutputStream()) { + InputStream resultInputStream = payload.getRecordsInputStream( + new SelectObjectContentEventVisitor() { + @Override + public void visit(SelectObjectContentEvent.EndEvent event) { + isResultComplete.set(true); + } + } + ); + copy(resultInputStream, out); + res = out.toString().trim(); + } catch (Throwable e) { + System.out.println("SQL query failure"); + throw new RuntimeException("SQL query failure", e); + } + /* + * The End Event indicates all matching records have been transmitted. + * If the End Event is not received, the results may be incomplete. + */ + if (!isResultComplete.get()) { + throw new RuntimeException("S3 Select request was incomplete as End Event was not received."); + } + return res; + } + + private static SelectObjectContentRequest generateBaseCSVRequest(String bucket, String key, String query) { + SelectObjectContentRequest request = new SelectObjectContentRequest(); + request.setBucketName(bucket); + request.setKey(key); + request.setExpression(query); + request.setExpressionType(ExpressionType.SQL); + + InputSerialization inputSerialization = new InputSerialization(); + inputSerialization.setCsv(new CSVInput()); + inputSerialization.setCompressionType(CompressionType.NONE); + request.setInputSerialization(inputSerialization); + + OutputSerialization outputSerialization = new OutputSerialization(); + outputSerialization.setCsv(new CSVOutput()); + request.setOutputSerialization(outputSerialization); + + return request; + } +} diff --git a/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3UtilProgram.java b/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3UtilProgram.java new file mode 100644 index 0000000000000000000000000000000000000000..70cb55854c573f66546a420061a3b616784ee1a7 --- /dev/null +++ b/flink-end-to-end-tests/flink-e2e-test-utils/src/main/java/org/apache/flink/streaming/tests/util/s3/S3UtilProgram.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.tests.util.s3; + +import org.apache.flink.api.java.utils.ParameterTool; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.transfer.KeyFilter; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.TransferManagerBuilder; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +/** + * S3 utilities. + * + *

Usage: java -jar S3UtilProgram.jar args. + * + *

Note: {@code S3UtilProgram.Action.lineNumber*} actions are applicable only + * to valid non-compressed CSV comma separated files. + * + *

Program parameters: + *

+ */ +class S3UtilProgram { + enum Action { + listByFullPathPrefix, + downloadFile, + downloadByFullPathAndFileNamePrefix, + deleteFile, + deleteByFullPathPrefix, + numberOfLinesInFile, + numberOfLinesInFilesWithFullAndNamePrefix + } + + private static final Map> handlers; + static { + Map> handlersMutable = new HashMap<>(); + handlersMutable.put(Action.listByFullPathPrefix, S3UtilProgram::listByFullPathPrefix); + handlersMutable.put(Action.downloadFile, S3UtilProgram::downloadFile); + handlersMutable.put(Action.downloadByFullPathAndFileNamePrefix, + S3UtilProgram::downloadByFullPathAndFileNamePrefix); + handlersMutable.put(Action.deleteFile, S3UtilProgram::deleteFile); + handlersMutable.put(Action.deleteByFullPathPrefix, S3UtilProgram::deleteByFullPathPrefix); + handlersMutable.put(Action.numberOfLinesInFile, S3UtilProgram::numberOfLinesInFile); + handlersMutable.put(Action.numberOfLinesInFilesWithFullAndNamePrefix, + S3UtilProgram::numberOfLinesInFilesWithFullAndNamePrefix); + handlers = Collections.unmodifiableMap(handlersMutable); + } + + private static final String countQuery = "select count(*) from s3object"; + + public static void main(String[] args) { + final ParameterTool params = ParameterTool.fromArgs(args); + final Action action = Action.valueOf(params.getRequired("action")); + handlers.get(action).accept(params); + } + + private static void listByFullPathPrefix(ParameterTool params) { + final String bucket = params.getRequired("bucket"); + final String s3prefix = params.getRequired("s3prefix"); + listByFullPathPrefix(bucket, s3prefix).forEach(System.out::println); + } + + private static List listByFullPathPrefix(final String bucket, final String s3prefix) { + return AmazonS3ClientBuilder.defaultClient().listObjects(bucket, s3prefix).getObjectSummaries() + .stream().map(S3ObjectSummary::getKey).collect(Collectors.toList()); + } + + private static void downloadFile(ParameterTool params) { + final String bucket = params.getRequired("bucket"); + final String s3file = params.getRequired("s3file"); + final String localFile = params.getRequired("localFile"); + TransferManager tx = TransferManagerBuilder.defaultTransferManager(); + try { + tx.download(bucket, s3file, new File(localFile)).waitForCompletion(); + } catch (InterruptedException e) { + System.out.println("Transfer interrupted"); + } finally { + tx.shutdownNow(); + } + } + + private static void downloadByFullPathAndFileNamePrefix(ParameterTool params) { + final String bucket = params.getRequired("bucket"); + final String s3prefix = params.getRequired("s3prefix"); + final String localFolder = params.getRequired("localFolder"); + final String s3filePrefix = params.get("s3filePrefix", ""); + TransferManager tx = TransferManagerBuilder.defaultTransferManager(); + Predicate keyPredicate = getKeyFilterByFileNamePrefix(s3filePrefix); + KeyFilter keyFilter = s3filePrefix.isEmpty() ? KeyFilter.INCLUDE_ALL : + objectSummary -> keyPredicate.test(objectSummary.getKey()); + try { + tx.downloadDirectory(bucket, s3prefix, new File(localFolder), keyFilter).waitForCompletion(); + } catch (InterruptedException e) { + System.out.println("Transfer interrupted"); + } finally { + tx.shutdownNow(); + } + } + + private static Predicate getKeyFilterByFileNamePrefix(String s3filePrefix) { + if (s3filePrefix.isEmpty()) { + return key -> true; + } else { + return key -> { + String[] parts = key.split("/"); + String fileName = parts[parts.length - 1]; + return fileName.startsWith(s3filePrefix); + }; + } + } + + private static void deleteFile(ParameterTool params) { + final String bucket = params.getRequired("bucket"); + final String s3file = params.getRequired("s3file"); + AmazonS3ClientBuilder.defaultClient().deleteObject(bucket, s3file); + } + + private static void deleteByFullPathPrefix(ParameterTool params) { + final String bucket = params.getRequired("bucket"); + final String s3prefix = params.getRequired("s3prefix"); + String[] keys = listByFullPathPrefix(bucket, s3prefix).toArray(new String[] {}); + if (keys.length > 0) { + DeleteObjectsRequest request = new DeleteObjectsRequest(bucket).withKeys(keys); + AmazonS3ClientBuilder.defaultClient().deleteObjects(request); + } + } + + private static void numberOfLinesInFile(ParameterTool params) { + final String bucket = params.getRequired("bucket"); + final String s3file = params.getRequired("s3file"); + AmazonS3 s3client = AmazonS3ClientBuilder.defaultClient(); + System.out.print(S3QueryUtil.queryFile(s3client, bucket, s3file, countQuery)); + s3client.shutdown(); + } + + private static void numberOfLinesInFilesWithFullAndNamePrefix(ParameterTool params) { + final String bucket = params.getRequired("bucket"); + final String s3prefix = params.getRequired("s3prefix"); + final String s3filePrefix = params.get("s3filePrefix", ""); + int parallelism = params.getInt("parallelism", 10); + + List files = listByFullPathPrefix(bucket, s3prefix); + + ExecutorService executor = Executors.newFixedThreadPool(parallelism); + AmazonS3 s3client = AmazonS3ClientBuilder.defaultClient(); + List> requests = + submitLineCountingRequestsForFilesAsync(executor, s3client, bucket, files, s3filePrefix); + int count = waitAndComputeTotalLineCountResult(requests); + + executor.shutdownNow(); + s3client.shutdown(); + System.out.print(count); + } + + private static List> submitLineCountingRequestsForFilesAsync( + ExecutorService executor, AmazonS3 s3client, String bucket, List files, String s3filePrefix) { + List> requests = new ArrayList<>(); + Predicate keyPredicate = getKeyFilterByFileNamePrefix(s3filePrefix); + files.forEach(file -> { + if (keyPredicate.test(file)) { + CompletableFuture result = new CompletableFuture<>(); + executor.execute(() -> + result.complete(Integer.parseInt(S3QueryUtil.queryFile(s3client, bucket, file, countQuery)))); + requests.add(result); + } + }); + return requests; + } + + private static int waitAndComputeTotalLineCountResult(List> requests) { + int count = 0; + for (CompletableFuture result : requests) { + try { + count += result.get(); + } catch (Throwable e) { + System.out.println("Failed count lines"); + e.printStackTrace(); + } + } + return count; + } +} diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 382239fcf31ea593d04af2570ab0940024678dd9..07b65f9a1e3a86054e347e277157c493903a1557 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -55,6 +55,7 @@ under the License. flink-sql-client-test flink-streaming-file-sink-test flink-state-evolution-test + flink-e2e-test-utils diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 6a6c8b6729b4accf18c88c61673b47cb1e8c575e..832bdeef93a4339148cd6a51fefed6d2afbb9685 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -101,6 +101,7 @@ run_test "DataSet allround end-to-end test" "$END_TO_END_DIR/test-scripts/test_b run_test "Streaming SQL end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_sql.sh" "skip_check_exceptions" run_test "Streaming bucketing end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_bucketing.sh" "skip_check_exceptions" run_test "Streaming File Sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh" "skip_check_exceptions" +run_test "Streaming File Sink s3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh s3" "skip_check_exceptions" run_test "Stateful stream job upgrade end-to-end test" "$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4" run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR/test-scripts/test_local_recovery_and_scheduling.sh 4 3 file false false" "skip_check_exceptions" diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index bdf6f64c5a8f2f8a889fabf92d590b80df4cd41a..4f628fc6d124009ae15226413a6f626efc301252 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -462,43 +462,6 @@ function check_result_hash { fi } -function s3_put { - local_file=$1 - bucket=$2 - s3_file=$3 - resource="/${bucket}/${s3_file}" - contentType="application/octet-stream" - dateValue=`date -R` - stringToSign="PUT\n\n${contentType}\n${dateValue}\n${resource}" - s3Key=$ARTIFACTS_AWS_ACCESS_KEY - s3Secret=$ARTIFACTS_AWS_SECRET_KEY - signature=`echo -en ${stringToSign} | openssl sha1 -hmac ${s3Secret} -binary | base64` - curl -X PUT -T "${local_file}" \ - -H "Host: ${bucket}.s3.amazonaws.com" \ - -H "Date: ${dateValue}" \ - -H "Content-Type: ${contentType}" \ - -H "Authorization: AWS ${s3Key}:${signature}" \ - https://${bucket}.s3.amazonaws.com/${s3_file} -} - -function s3_delete { - bucket=$1 - s3_file=$2 - resource="/${bucket}/${s3_file}" - contentType="application/octet-stream" - dateValue=`date -R` - stringToSign="DELETE\n\n${contentType}\n${dateValue}\n${resource}" - s3Key=$ARTIFACTS_AWS_ACCESS_KEY - s3Secret=$ARTIFACTS_AWS_SECRET_KEY - signature=`echo -en ${stringToSign} | openssl sha1 -hmac ${s3Secret} -binary | base64` - curl -X DELETE \ - -H "Host: ${bucket}.s3.amazonaws.com" \ - -H "Date: ${dateValue}" \ - -H "Content-Type: ${contentType}" \ - -H "Authorization: AWS ${s3Key}:${signature}" \ - https://${bucket}.s3.amazonaws.com/${s3_file} -} - # This function starts the given number of task managers and monitors their processes. # If a task manager process goes away a replacement is started. function tm_watchdog { @@ -692,3 +655,20 @@ function expect_in_taskmanager_logs { fi done } + +function wait_for_restart_to_complete { + local base_num_restarts=$1 + local jobid=$2 + + local current_num_restarts=${base_num_restarts} + local expected_num_restarts=$((current_num_restarts + 1)) + + echo "Waiting for restart to happen" + while ! [[ ${current_num_restarts} -eq ${expected_num_restarts} ]]; do + sleep 5 + current_num_restarts=$(get_job_metric ${jobid} "fullRestarts") + if [[ -z ${current_num_restarts} ]]; then + current_num_restarts=${base_num_restarts} + fi + done +} diff --git a/flink-end-to-end-tests/test-scripts/common_s3.sh b/flink-end-to-end-tests/test-scripts/common_s3.sh new file mode 100644 index 0000000000000000000000000000000000000000..5c16bb75bea43fd54b7ff8d8b3fd85a10f248381 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/common_s3.sh @@ -0,0 +1,240 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +if [[ -z "$ARTIFACTS_AWS_BUCKET" ]]; then + echo "Did not find AWS environment variables, NOT running the e2e test." + exit 0 +else + echo "Found AWS bucket $ARTIFACTS_AWS_BUCKET, running the e2e test." +fi + +if [[ -z "$ARTIFACTS_AWS_ACCESS_KEY" ]]; then + echo "Did not find AWS environment variables, NOT running the e2e test." + exit 0 +else + echo "Found AWS access key $ARTIFACTS_AWS_ACCESS_KEY, running the e2e test." +fi + +if [[ -z "$ARTIFACTS_AWS_SECRET_KEY" ]]; then + echo "Did not find AWS environment variables, NOT running the e2e test." + exit 0 +else + echo "Found AWS secret key $ARTIFACTS_AWS_SECRET_KEY, running the e2e test." +fi + +AWS_REGION="${AWS_REGION:-eu-west-1}" +AWS_ACCESS_KEY=$ARTIFACTS_AWS_ACCESS_KEY +AWS_SECRET_KEY=$ARTIFACTS_AWS_SECRET_KEY + +s3util="java -jar ${END_TO_END_DIR}/flink-e2e-test-utils/target/S3UtilProgram.jar" + +################################### +# Setup Flink s3 access. +# +# Globals: +# FLINK_DIR +# ARTIFACTS_AWS_ACCESS_KEY +# ARTIFACTS_AWS_SECRET_KEY +# Arguments: +# None +# Returns: +# None +################################### +function s3_setup { + # make sure we delete the file at the end + function s3_cleanup { + rm $FLINK_DIR/lib/flink-s3-fs*.jar + + # remove any leftover settings + sed -i -e 's/s3.access-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml" + sed -i -e 's/s3.secret-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml" + } + trap s3_cleanup EXIT + + cp $FLINK_DIR/opt/flink-s3-fs-hadoop-*.jar $FLINK_DIR/lib/ + echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml" + echo "s3.secret-key: $ARTIFACTS_AWS_SECRET_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml" +} + +s3_setup + +################################### +# List s3 objects by full path prefix. +# +# Globals: +# ARTIFACTS_AWS_BUCKET +# Arguments: +# $1 - s3 full path key prefix +# Returns: +# List of s3 object keys, separated by newline +################################### +function s3_list { + AWS_REGION=$AWS_REGION \ + ${s3util} --action listByFullPathPrefix --s3prefix "$1" --bucket $ARTIFACTS_AWS_BUCKET +} + +################################### +# Download s3 object. +# +# Globals: +# ARTIFACTS_AWS_BUCKET +# Arguments: +# $1 - local path to save file +# $2 - s3 object key +# Returns: +# None +################################### +function s3_get { + AWS_REGION=$AWS_REGION \ + ${s3util} --action downloadFile --localFile "$1" --s3file "$2" --bucket $ARTIFACTS_AWS_BUCKET +} + +################################### +# Download s3 objects to folder by full path prefix. +# +# Globals: +# ARTIFACTS_AWS_BUCKET +# Arguments: +# $1 - local path to save folder with files +# $2 - s3 key full path prefix +# $3 - s3 file name prefix w/o directory to filter files by name (optional) +# Returns: +# None +################################### +function s3_get_by_full_path_and_filename_prefix { + local file_prefix="${3-}" + AWS_REGION=$AWS_REGION \ + ${s3util} --action downloadByFullPathAndFileNamePrefix \ + --localFolder "$1" --s3prefix "$2" --s3filePrefix "${file_prefix}" --bucket $ARTIFACTS_AWS_BUCKET +} + +################################### +# Upload file to s3 object. +# +# Globals: +# ARTIFACTS_AWS_BUCKET +# Arguments: +# $1 - local file to upload +# $2 - s3 bucket +# $3 - s3 object key +# Returns: +# None +################################### +function s3_put { + local_file=$1 + bucket=$2 + s3_file=$3 + resource="/${bucket}/${s3_file}" + contentType="application/octet-stream" + dateValue=`date -R` + stringToSign="PUT\n\n${contentType}\n${dateValue}\n${resource}" + s3Key=$ARTIFACTS_AWS_ACCESS_KEY + s3Secret=$ARTIFACTS_AWS_SECRET_KEY + signature=`echo -en ${stringToSign} | openssl sha1 -hmac ${s3Secret} -binary | base64` + curl -X PUT -T "${local_file}" \ + -H "Host: ${bucket}.s3.amazonaws.com" \ + -H "Date: ${dateValue}" \ + -H "Content-Type: ${contentType}" \ + -H "Authorization: AWS ${s3Key}:${signature}" \ + https://${bucket}.s3.amazonaws.com/${s3_file} +} + +################################### +# Delete s3 object. +# +# Globals: +# None +# Arguments: +# $1 - s3 bucket +# $2 - s3 object key +# $3 - (optional) s3 host suffix +# Returns: +# None +################################### +function s3_delete { + bucket=$1 + s3_file=$2 + resource="/${bucket}/${s3_file}" + contentType="application/octet-stream" + dateValue=`date -R` + stringToSign="DELETE\n\n${contentType}\n${dateValue}\n${resource}" + s3Key=$ARTIFACTS_AWS_ACCESS_KEY + s3Secret=$ARTIFACTS_AWS_SECRET_KEY + signature=`echo -en ${stringToSign} | openssl sha1 -hmac ${s3Secret} -binary | base64` + curl -X DELETE \ + -H "Host: ${bucket}.s3.amazonaws.com" \ + -H "Date: ${dateValue}" \ + -H "Content-Type: ${contentType}" \ + -H "Authorization: AWS ${s3Key}:${signature}" \ + https://${bucket}.s3.amazonaws.com/${s3_file} +} + +################################### +# Delete s3 objects by full path prefix. +# +# Globals: +# ARTIFACTS_AWS_BUCKET +# Arguments: +# $1 - s3 key full path prefix +# Returns: +# None +################################### +function s3_delete_by_full_path_prefix { + AWS_REGION=$AWS_REGION \ + ${s3util} --action deleteByFullPathPrefix --s3prefix "$1" --bucket $ARTIFACTS_AWS_BUCKET +} + +################################### +# Count number of lines in a file of s3 object. +# The lines has to be simple to comply with CSV format +# because SQL is used to query the s3 object. +# +# Globals: +# ARTIFACTS_AWS_BUCKET +# Arguments: +# $1 - s3 file object key +# $2 - s3 bucket +# Returns: +# None +################################### +function s3_get_number_of_lines_in_file { + AWS_REGION=$AWS_REGION \ + ${s3util} --action numberOfLinesInFile --s3file "$1" --bucket $ARTIFACTS_AWS_BUCKET +} + +################################### +# Count number of lines in files of s3 objects filtered by prefix. +# The lines has to be simple to comply with CSV format +# because SQL is used to query the s3 objects. +# +# Globals: +# ARTIFACTS_AWS_BUCKET +# Arguments: +# $1 - s3 key prefix +# $2 - s3 bucket +# $3 - s3 file name prefix w/o directory to filter files by name (optional) +# Returns: +# None +################################### +function s3_get_number_of_lines_by_prefix { + local file_prefix="${3-}" + AWS_REGION=$AWS_REGION \ + ${s3util} --action numberOfLinesInFilesWithFullAndNamePrefix \ + --s3prefix "$1" --s3filePrefix "${file_prefix}" --bucket $ARTIFACTS_AWS_BUCKET +} diff --git a/flink-end-to-end-tests/test-scripts/test_shaded_hadoop_s3a.sh b/flink-end-to-end-tests/test-scripts/test_shaded_hadoop_s3a.sh index e5ac5bcecdce93ef5657fbc5b65ccd621d3d2842..3d8386758526a88ec03c5e49cb8d47dd7b4653d8 100755 --- a/flink-end-to-end-tests/test-scripts/test_shaded_hadoop_s3a.sh +++ b/flink-end-to-end-tests/test-scripts/test_shaded_hadoop_s3a.sh @@ -19,33 +19,18 @@ # Tests for our shaded/bundled Hadoop S3A file system. -if [[ -z "$ARTIFACTS_AWS_BUCKET" ]]; then - echo "Did not find AWS environment variables, NOT running Shaded Hadoop S3A e2e tests." - exit 0 -else - echo "Found AWS bucket $ARTIFACTS_AWS_BUCKET, running Shaded Hadoop S3A e2e tests." -fi - source "$(dirname "$0")"/common.sh +source "$(dirname "$0")"/common_s3.sh s3_put $TEST_INFRA_DIR/test-data/words $ARTIFACTS_AWS_BUCKET flink-end-to-end-test-shaded-s3a # make sure we delete the file at the end -function s3_cleanup { +function shaded_s3a_cleanup { s3_delete $ARTIFACTS_AWS_BUCKET flink-end-to-end-test-shaded-s3a - rm $FLINK_DIR/lib/flink-s3-fs*.jar - - # remove any leftover settings - sed -i -e 's/s3.access-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml" - sed -i -e 's/s3.secret-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml" } -trap s3_cleanup EXIT - -cp $FLINK_DIR/opt/flink-s3-fs-hadoop-*.jar $FLINK_DIR/lib/ -echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml" -echo "s3.secret-key: $ARTIFACTS_AWS_SECRET_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml" +trap shaded_s3a_cleanup EXIT start_cluster $FLINK_DIR/bin/flink run -p 1 $FLINK_DIR/examples/batch/WordCount.jar --input s3:/$resource --output $TEST_DATA_DIR/out/wc_out -check_result_hash "WordCountWithShadedS3A" $TEST_DATA_DIR/out/wc_out "72a690412be8928ba239c2da967328a5" \ No newline at end of file +check_result_hash "WordCountWithShadedS3A" $TEST_DATA_DIR/out/wc_out "72a690412be8928ba239c2da967328a5" diff --git a/flink-end-to-end-tests/test-scripts/test_shaded_presto_s3.sh b/flink-end-to-end-tests/test-scripts/test_shaded_presto_s3.sh index 4092805dc278f630b8df44e1dc9f19c863549a3c..bd33b410dfd4ab3e239158f5a5577fa3c0fef5d5 100755 --- a/flink-end-to-end-tests/test-scripts/test_shaded_presto_s3.sh +++ b/flink-end-to-end-tests/test-scripts/test_shaded_presto_s3.sh @@ -19,33 +19,18 @@ # Tests for our shaded/bundled Hadoop S3A file system. -if [[ -z "$ARTIFACTS_AWS_BUCKET" ]]; then - echo "Did not find AWS environment variables, NOT running Shaded Presto S3 e2e tests." - exit 0 -else - echo "Found AWS bucket $ARTIFACTS_AWS_BUCKET, running Shaded Presto S3 e2e tests." -fi - source "$(dirname "$0")"/common.sh +source "$(dirname "$0")"/common_s3.sh s3_put $TEST_INFRA_DIR/test-data/words $ARTIFACTS_AWS_BUCKET flink-end-to-end-test-shaded-presto-s3 # make sure we delete the file at the end -function s3_cleanup { +function shaded_presto_s3_cleanup { s3_delete $ARTIFACTS_AWS_BUCKET flink-end-to-end-test-shaded-presto-s3 - rm $FLINK_DIR/lib/flink-s3-fs*.jar } -trap s3_cleanup EXIT - -cp $FLINK_DIR/opt/flink-s3-fs-presto-*.jar $FLINK_DIR/lib/ -echo "s3.access-key: $ARTIFACTS_AWS_ACCESS_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml" -echo "s3.secret-key: $ARTIFACTS_AWS_SECRET_KEY" >> "$FLINK_DIR/conf/flink-conf.yaml" +trap shaded_presto_s3_cleanup EXIT start_cluster $FLINK_DIR/bin/flink run -p 1 $FLINK_DIR/examples/batch/WordCount.jar --input s3:/$resource --output $TEST_DATA_DIR/out/wc_out check_result_hash "WordCountWithShadedPrestoS3" $TEST_DATA_DIR/out/wc_out "72a690412be8928ba239c2da967328a5" - -# remove any leftover settings -sed -i -e 's/s3.access-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml" -sed -i -e 's/s3.secret-key: .*//' "$FLINK_DIR/conf/flink-conf.yaml" diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh index 17389ad6f4ec250153463d48ade3e32a4468b3f8..50f5afc1312563be90c491c87542f5afabf86eac 100755 --- a/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh +++ b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh @@ -17,27 +17,37 @@ # limitations under the License. ################################################################################ -source "$(dirname "$0")"/common.sh - -TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-streaming-file-sink-test/target/StreamingFileSinkProgram.jar" +OUT_TYPE="${1:-local}" # other type: s3 -OUTPUT_PATH="$TEST_DATA_DIR/out" - -function wait_for_restart { - local base_num_restarts=$1 - - local current_num_restarts=${base_num_restarts} - local expected_num_restarts=$((current_num_restarts + 1)) +source "$(dirname "$0")"/common.sh +source "$(dirname "$0")"/common_s3.sh + +OUT=out +OUTPUT_PATH="$TEST_DATA_DIR/$OUT" +S3_OUTPUT_PATH="s3://$ARTIFACTS_AWS_BUCKET/$OUT" + +mkdir -p $OUTPUT_PATH + +if [ "${OUT_TYPE}" == "local" ]; then + echo "Use local output" + JOB_OUTPUT_PATH=${OUTPUT_PATH} +elif [ "${OUT_TYPE}" == "s3" ]; then + echo "Use s3 output" + JOB_OUTPUT_PATH=${S3_OUTPUT_PATH} +else + echo "Unknown output type: ${OUT_TYPE}" + exit 1 +fi - echo "Waiting for restart to happen" - while ! [[ ${current_num_restarts} -eq ${expected_num_restarts} ]]; do - sleep 5 - current_num_restarts=$(get_job_metric ${JOB_ID} "fullRestarts") - if [[ -z ${current_num_restarts} ]]; then - current_num_restarts=${base_num_restarts} - fi - done +# make sure we delete the file at the end +function out_cleanup { + s3_delete_by_full_path_prefix $OUT } +if [ "${OUT_TYPE}" == "s3" ]; then + trap out_cleanup EXIT +fi + +TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-streaming-file-sink-test/target/StreamingFileSinkProgram.jar" ################################### # Get all lines in part files and sort them numerically. @@ -47,10 +57,32 @@ function wait_for_restart { # Arguments: # None # Returns: -# None +# sorted content of part files ################################### function get_complete_result { - find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g + if [ "${OUT_TYPE}" == "s3" ]; then + rm -rf $OUTPUT_PATH; mkdir -p $OUTPUT_PATH + s3_get_by_full_path_and_filename_prefix ${TEST_DATA_DIR} "${OUT}" "part-" + fi + find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g +} + +################################### +# Get total number of lines in part files. +# +# Globals: +# OUT +# Arguments: +# None +# Returns: +# line number in part files +################################### +function get_total_number_of_valid_lines { + if [ "${OUT_TYPE}" == "local" ]; then + get_complete_result | wc -l | tr -d '[:space:]' + elif [ "${OUT_TYPE}" == "s3" ]; then + s3_get_number_of_lines_by_prefix "${OUT}" "part-" + fi } ################################### @@ -83,7 +115,7 @@ function wait_for_complete_result { sleep ${polling_interval} ((seconds_elapsed += ${polling_interval})) - number_of_values=$(get_complete_result | wc -l | tr -d '[:space:]') + number_of_values=$(get_total_number_of_valid_lines) if [[ ${previous_number_of_values} -ne ${number_of_values} ]]; then echo "Number of produced values ${number_of_values}/${expected_number_of_values}" previous_number_of_values=${number_of_values} @@ -98,7 +130,7 @@ start_cluster "${FLINK_DIR}/bin/taskmanager.sh" start echo "Submitting job." -CLIENT_OUTPUT=$("$FLINK_DIR/bin/flink" run -d "${TEST_PROGRAM_JAR}" --outputPath "${OUTPUT_PATH}") +CLIENT_OUTPUT=$("$FLINK_DIR/bin/flink" run -d "${TEST_PROGRAM_JAR}" --outputPath "${JOB_OUTPUT_PATH}") JOB_ID=$(echo "${CLIENT_OUTPUT}" | grep "Job has been submitted with JobID" | sed 's/.* //g') if [[ -z $JOB_ID ]]; then @@ -117,7 +149,7 @@ kill_random_taskmanager echo "Starting TM" "$FLINK_DIR/bin/taskmanager.sh" start -wait_for_restart 0 +wait_for_restart_to_complete 0 ${JOB_ID} echo "Killing 2 TMs" kill_random_taskmanager @@ -127,7 +159,7 @@ echo "Starting 2 TMs" "$FLINK_DIR/bin/taskmanager.sh" start "$FLINK_DIR/bin/taskmanager.sh" start -wait_for_restart 1 +wait_for_restart_to_complete 1 ${JOB_ID} echo "Waiting until all values have been produced" wait_for_complete_result 60000 300