提交 8dc8ff3f 编写于 作者: P Peifeng Qiu 提交者: Adam Lee

s3ext: fix abnormal behavior when QEs are reset

In some cases QEs may be reset, then S3 extension stays on undefined
behaviors. It may or may not catch the transaction abort signal.
For example, when GPDB cannot parse imported data from S3, query
and transaction are aborted and reset as new QE context starts.

To solve it, we keep the signal of transaction cancel once it is
captured until we stop all running threads and exit gracefully.
Signed-off-by: NHaozhou Wang <hawang@pivotal.io>
上级 fe7ed492
......@@ -8,7 +8,8 @@ class S3Exception {
string file;
uint64_t line;
string func;
S3Exception() : line(0) {}
S3Exception() : line(0) {
}
virtual ~S3Exception() {
}
......
test: 0_00_prepare_s3_protocol
# ~ 1s
test: 1_03_bad_data
test: 1_04_empty_prefix
test: 1_05_one_line
test: 1_06_1correct_1wrong
test: 2_01_invalid_syntax
test: 2_02_invalid_region
test: 2_03_invalid_config
test: 2_04_invalid_header
test: 3_01_create_wet
test: 3_02_quick_shoot_wet
test: 4_01_create_invalid_wet
test: 1_03_bad_data 1_04_empty_prefix 1_05_one_line 1_06_1correct_1wrong 2_01_invalid_syntax 2_02_invalid_region 2_03_invalid_config 2_04_invalid_header 3_01_create_wet 3_02_quick_shoot_wet 4_01_create_invalid_wet
# tens of seconds
test: 1_01_normal 1_02_log_error 1_10_all_regions 1_11_gzipped_data 1_12_no_prefix 1_13_parallel1 1_13_parallel2 1_09_partition 3_09_write_big_row 3_10_write_mixed_length_rows
......
......@@ -34,9 +34,21 @@ Datum s3_import(PG_FUNCTION_ARGS);
* QueryCancelPending is not sufficient because it will be reset before the
* extprotocol last call, then it is hard to distinguish normal exit/finish
* from abnormal transaction abort.
*
* IsAbortInProgress() can also be reset by GPDB while downloading file, we
* must save the state
*/
static bool queryCancelFlag = false;
bool S3QueryIsAbortInProgress(void) {
return QueryCancelPending || IsAbortInProgress();
bool queryIsBeingCancelled = QueryCancelPending || IsAbortInProgress();
// We need a thread-safe way to query and set queryCancelFlag.
// queryCancelFlag is set to TRUE for the first time when cancel signal is
// detected. If cancel signal is already captured, value will not be
// swapped and queryIsCancelledAlready is true.
bool queryIsCancelledAlready = !__sync_bool_compare_and_swap(&queryCancelFlag, false, queryIsBeingCancelled);
return queryIsBeingCancelled || queryIsCancelledAlready;
}
/*
......@@ -77,12 +89,12 @@ Datum s3_import(PG_FUNCTION_ARGS) {
}
EXTPROTOCOL_SET_USER_CTX(fcinfo, NULL);
PG_RETURN_INT32(0);
}
/* first call. do any desired init */
if (gpreader == NULL) {
queryCancelFlag = false;
const char *url_with_options = EXTPROTOCOL_GET_URL(fcinfo);
thread_setup();
......@@ -130,12 +142,12 @@ Datum s3_export(PG_FUNCTION_ARGS) {
}
EXTPROTOCOL_SET_USER_CTX(fcinfo, NULL);
PG_RETURN_INT32(0);
}
/* first call. do any desired init */
if (gpwriter == NULL) {
queryCancelFlag = false;
const char *url_with_options = EXTPROTOCOL_GET_URL(fcinfo);
const char *format = get_format_str(fcinfo);
......
......@@ -22,8 +22,6 @@ void S3BucketReader::open(const S3Params& params) {
this->keyList =
this->s3interface->listBucket(this->schema, this->region, this->bucket, this->prefix);
return;
}
BucketContent& S3BucketReader::getNextKey() {
......
......@@ -85,8 +85,6 @@ void SignRequestV4(const string &method, HTTPHeaders *h, const string &orig_regi
<< ",Signature=" << signature_hex;
h->Add(AUTHORIZATION, signature_header.str());
return;
}
// getOptS3 returns first value according to given key.
......
......@@ -203,7 +203,6 @@ Response S3InterfaceService::getBucketResponse(const string &region, const strin
return this->getResponseWithRetries(url, header);
}
// parseBucketXML must not throw exception, otherwise result is leaked.
bool S3InterfaceService::parseBucketXML(ListBucketResult *result, xmlParserCtxtPtr xmlcontext,
string &marker) {
if ((result == NULL) || (xmlcontext == NULL)) {
......@@ -315,7 +314,8 @@ ListBucketResult S3InterfaceService::listBucket(const string &schema, const stri
ListBucketResult result;
string marker = "";
do { // To get next set(up to 1000) keys in one iteration.
do {
// To get next set(up to 1000) keys in one iteration.
// S3 requires query parameters specified alphabetically.
string url = this->getUrl(prefix, schema, host.str(), bucket, marker);
......
......@@ -208,8 +208,6 @@ void S3KeyReader::open(const S3Params& params) {
pthread_create(&thread, NULL, DownloadThreadFunc, &this->chunkBuffers[i]);
this->threads.push_back(thread);
}
return;
}
uint64_t S3KeyReader::read(char* buf, uint64_t count) {
......@@ -263,6 +261,4 @@ void S3KeyReader::close() {
}
this->reset();
return;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册