提交 beabf1a2 编写于 作者: P Peifeng Qiu 提交者: Adam Lee

s3ext: update names, comments and delete dead codes

Signed-off-by: NAdam Lee <ali@pivotal.io>
上级 dbea15a9
......@@ -23,8 +23,6 @@ struct S3Credential {
string secret;
};
enum Method { GET, PUT, POST, DELETE, HEAD };
void SignRequestV4(const string& method, HTTPHeaders* h, const string& orig_region,
const string& path, const string& query, const S3Credential& cred);
......@@ -32,4 +30,4 @@ string get_opt_s3(const string& options, const string& key);
string truncate_options(const string& url_with_options);
#endif // __S3_COMMON_H__
#endif // __S3_COMMON_H__
\ No newline at end of file
......@@ -42,7 +42,7 @@ class S3KeyWriter : public Writer {
}
protected:
static void* writerThread(void* p);
static void* UploadThreadFunc(void* p);
void flushBuffer();
void completeKeyWriting();
......
......@@ -59,10 +59,9 @@ ChunkBuffer& ChunkBuffer::operator=(const ChunkBuffer& other) {
// that's why it checks if leftLen is larger than *or equal to* len below[1], provides a chance ret
// is 0, which is smaller than len. Otherwise, other functions won't know when to read next buffer.
uint64_t ChunkBuffer::read(char* buf, uint64_t len) {
// QueryCancelPending stops s3_import(), this check is not needed if
// s3_import() every time calls ChunkBuffer->Read() only once, otherwise(as we did in
// downstreamReader->read() for decompression feature before), first call sets buffer to
// ReadyToFill, second call hangs.
// GPDB abort signal stops s3_import(), this check is not needed if s3_import() every time calls
// ChunkBuffer->Read() only once, otherwise(as we did in downstreamReader->read() for
// decompression feature before), first call sets buffer to ReadyToFill, second call hangs.
CHECK_OR_DIE_MSG(!S3QueryIsAbortInProgress(), "%s",
"ChunkBuffer reading is interrupted by user");
......
......@@ -77,33 +77,33 @@ struct ThreadParams {
uint64_t currentNumber;
};
void* S3KeyWriter::writerThread(void* p) {
ThreadParams* pack = (ThreadParams*)p;
S3KeyWriter* writer = pack->keyWriter;
void* S3KeyWriter::UploadThreadFunc(void* data) {
ThreadParams* params = (ThreadParams*)data;
S3KeyWriter* writer = params->keyWriter;
try {
S3DEBUG("Upload thread start: %p, part number: %" PRIu64 ", data size: %" PRIu64,
pthread_self(), pack->currentNumber, pack->data.size());
string etag = writer->s3interface->uploadPartOfData(pack->data, writer->url, writer->region,
writer->cred, pack->currentNumber,
writer->uploadId);
pthread_self(), params->currentNumber, params->data.size());
string etag = writer->s3interface->uploadPartOfData(
params->data, writer->url, writer->region, writer->cred, params->currentNumber,
writer->uploadId);
// when unique_lock destructs it will automatically unlock the mutex.
UniqueLock threadLock(&writer->mutex);
// etag is empty if the query is cancelled by user.
if (!etag.empty()) {
writer->etagList[pack->currentNumber] = etag;
writer->etagList[params->currentNumber] = etag;
}
writer->activeThreads--;
pthread_cond_broadcast(&writer->cv);
S3DEBUG("Upload part finish: %p, eTag: %s, part number: %" PRIu64, pthread_self(),
etag.c_str(), pack->currentNumber);
etag.c_str(), params->currentNumber);
} catch (std::exception& e) {
S3ERROR("Upload thread error: %s", e.what());
}
delete pack;
delete params;
return NULL;
}
......@@ -114,20 +114,19 @@ void S3KeyWriter::flushBuffer() {
pthread_cond_wait(&this->cv, &this->mutex);
}
// Most time query is canceled during uploadPartOfData. This is the first chance to cancel
// and clean up upload. Otherwise GPDB will call with LAST_CALL but QueryCancelPending is
// set to false, and we can't detect query cancel signal in S3KeyWriter::close().
// Most time query is canceled during uploadPartOfData(). This is the first chance to cancel
// and clean up upload.
this->checkQueryCancelSignal();
this->activeThreads++;
pthread_t thread;
pthread_t writerThread;
ThreadParams* params = new ThreadParams();
params->keyWriter = this;
params->data.swap(this->buffer);
params->currentNumber = ++this->partNumber;
pthread_create(&thread, NULL, writerThread, params);
threadList.emplace_back(thread);
pthread_create(&writerThread, NULL, UploadThreadFunc, params);
threadList.emplace_back(writerThread);
this->buffer.clear();
}
......
......@@ -40,7 +40,7 @@ size_t RESTfulServiceWriteFuncCallback(char *ptr, size_t size, size_t nmemb, voi
}
// cURL's write function callback, only used by DELETE request when query is canceled.
// It shouldn't be interrupted by QueryCancelPending.
// It shouldn't be interrupted.
size_t RESTfulServiceAbortFuncCallback(char *ptr, size_t size, size_t nmemb, void *userp) {
size_t realsize = size * nmemb;
Response *resp = (Response *)userp;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册