提交 e9f90691 编写于 作者: H Haozhou Wang 提交者: Adam Lee

s3ext: Support cancel operation during uploading

1. Support "CANCEL" during uploading, the uploaded parts in S3 will be safely deleted.
2. The s3ext will not retry 3 times after the query is canceled by users.
3. Error messages are fixed.
Signed-off-by: NHaozhou Wang <hawang@pivotal.io>
上级 fa1620d1
......@@ -48,6 +48,15 @@ class S(BaseHTTPRequestHandler):
self._set_headers(length)
self.wfile.write(content)
def do_DELETE(self):
# Just bounce the request back
print "----- SOMETHING WAS DELETE ------"
print self.headers
length = int(self.headers['Content-Length'])
# content = self.rfile.read(length)
self._set_headers(length)
self.wfile.write("")
def run(server_class=HTTPServer, handler_class=S, port=8553):
server_address = ('', port)
handler_class.protocol_version = 'HTTP/1.1'
......
......@@ -17,17 +17,18 @@ enum ResponseStatus {
RESPONSE_OK, // everything is OK
RESPONSE_FAIL, // curl failed (i.e., the status is not CURLE_OK)
RESPONSE_ERROR, // server error (server return code is not 200)
RESPONSE_ABORT, // the query has been abort by user
};
typedef long ResponseCode;
#define HeadResponseFail -1
// 2XX are successful response.
// Here we deal with 200 (OK) and 206 (partial content) currently.
//
// Here we deal with 200 (OK), 204 (no content) and 206 (partial content) currently.
// 204 is for DELETE request.
// We may move this function to RESTfulService() in future
inline bool isSuccessfulResponse(ResponseCode code) {
return (code == 200 || code == 206);
return (code == 200 || code == 206 || code == 204);
}
struct UploadData {
......@@ -136,6 +137,9 @@ class RESTfulService {
virtual ResponseCode head(const string& url, HTTPHeaders& headers,
const map<string, string>& params) = 0;
virtual Response deleteRequest(const string& url, HTTPHeaders& headers,
const map<string, string>& params) = 0;
};
#endif /* INCLUDE_RESTFUL_SERVICE_H_ */
......@@ -118,6 +118,11 @@ class S3Interface {
const vector<string>& etagArray) {
throw std::runtime_error("Default implementation must not be called.");
}
virtual bool abortUpload(const string& keyUrl, const string& region, const S3Credential& cred,
const string& uploadId) {
throw std::runtime_error("Default implementation must not be called.");
}
};
class S3Service : public S3Interface {
......@@ -164,6 +169,9 @@ class S3Service : public S3Interface {
bool completeMultiPart(const string& keyUrl, const string& region, const S3Credential& cred,
const string& uploadId, const vector<string>& etagArray);
bool abortUpload(const string& keyUrl, const string& region, const S3Credential& cred,
const string& uploadId);
private:
string getUrl(const string& prefix, const string& schema, const string& host,
const string& bucket, const string& marker);
......
......@@ -16,7 +16,7 @@ using std::string;
class WriterBuffer : public vector<uint8_t> {};
class S3KeyWriter : Writer {
class S3KeyWriter : public Writer {
public:
S3KeyWriter() : s3interface(NULL), chunkSize(0) {
}
......@@ -40,6 +40,7 @@ class S3KeyWriter : Writer {
protected:
void flushBuffer();
void completeKeyWriting();
void checkQueryCancel();
WriterBuffer buffer;
S3Interface* s3interface;
......
......@@ -17,6 +17,9 @@ class S3RESTfulService : public RESTfulService {
Response post(const string& url, HTTPHeaders& headers, const map<string, string>& params,
const vector<uint8_t>& data);
Response deleteRequest(const string& url, HTTPHeaders& headers,
const map<string, string>& params);
};
#endif /* INCLUDE_S3RESTFUL_SERVICE_H_ */
......@@ -49,7 +49,7 @@ Response S3Service::getResponseWithRetries(const string &url, HTTPHeaders &heade
while (retries--) {
// declare response here to leverage RVO (Return Value Optimization)
Response response = this->restfulService->get(url, headers, params);
if (response.isSuccess() || (retries == 0)) {
if (response.isSuccess() || (retries == 0) || QueryCancelPending) {
return response;
};
......@@ -66,7 +66,7 @@ Response S3Service::putResponseWithRetries(const string &url, HTTPHeaders &heade
while (retries--) {
// declare response here to leverage RVO (Return Value Optimization)
Response response = this->restfulService->put(url, headers, params, data);
if (response.isSuccess() || (retries == 0)) {
if (response.isSuccess() || (retries == 0) || QueryCancelPending) {
return response;
};
......@@ -83,7 +83,7 @@ Response S3Service::postResponseWithRetries(const string &url, HTTPHeaders &head
while (retries--) {
// declare response here to leverage RVO (Return Value Optimization)
Response response = this->restfulService->post(url, headers, params, data);
if (response.isSuccess() || (retries == 0)) {
if (response.isSuccess() || (retries == 0) || QueryCancelPending) {
return response;
};
......@@ -109,7 +109,7 @@ ResponseCode S3Service::headResponseWithRetries(const string &url, HTTPHeaders &
while (retries--) {
response = this->restfulService->head(url, headers, params);
if (!isHeadResponseCodeNeedRetry(response) || (retries == 0)) {
if (!isHeadResponseCodeNeedRetry(response) || (retries == 0) || QueryCancelPending) {
return response;
};
......@@ -492,6 +492,7 @@ string S3Service::uploadPartOfData(vector<uint8_t> &data, const string &keyUrl,
urlWithQuery << keyUrl << "?partNumber=" << partNumber << "&uploadId=" << uploadId;
Response resp = this->putResponseWithRetries(urlWithQuery.str(), headers, params, data);
if (resp.getStatus() == RESPONSE_OK) {
string headers(resp.getRawHeaders().begin(), resp.getRawHeaders().end());
......@@ -508,6 +509,8 @@ string S3Service::uploadPartOfData(vector<uint8_t> &data, const string &keyUrl,
if (!s3msg.getMessage().empty()) {
S3ERROR("Amazon S3 returns error \"%s\"", s3msg.getMessage().c_str());
}
} else if (resp.getStatus() == RESPONSE_ABORT) {
return "";
}
// getStatus == RESPONSE_ERROR || RESPONSE_FAIL
......@@ -558,6 +561,49 @@ bool S3Service::completeMultiPart(const string &keyUrl, const string &region,
Response resp = this->postResponseWithRetries(
urlWithQuery.str(), headers, params, vector<uint8_t>(bodyString.begin(), bodyString.end()));
if (resp.getStatus() == RESPONSE_OK) {
return true;
} else if (resp.getStatus() == RESPONSE_ERROR) {
S3MessageParser s3msg(resp);
if (!s3msg.getMessage().empty()) {
S3ERROR("Amazon S3 returns error \"%s\"", s3msg.getMessage().c_str());
}
} else if (resp.getStatus() == RESPONSE_ABORT) {
return false;
}
// getStatus == RESPONSE_ERROR || RESPONSE_FAIL
S3ERROR("Failed to request: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
CHECK_OR_DIE_MSG(false, "Failed to request: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
return false;
}
bool S3Service::abortUpload(const string &keyUrl, const string &region, const S3Credential &cred,
const string &uploadId) {
HTTPHeaders headers;
map<string, string> params;
UrlParser parser(keyUrl);
stringstream queryString;
headers.Add(HOST, parser.getHost());
headers.Add(CONTENTTYPE, "text/plain");
headers.Add(X_AMZ_CONTENT_SHA256, "UNSIGNED-PAYLOAD");
headers.Add(CONTENTLENGTH, "0");
queryString << "uploadId=" << uploadId;
// DELETE /ObjectName?uploadId=UploadId HTTP/1.1
SignRequestV4("DELETE", &headers, region, parser.getPath(), queryString.str(), cred);
stringstream urlWithQuery;
urlWithQuery << keyUrl << "?uploadId=" << uploadId;
Response resp = this->restfulService->deleteRequest(urlWithQuery.str(), headers, params);
if (resp.getStatus() == RESPONSE_OK) {
return true;
} else if (resp.getStatus() == RESPONSE_ERROR) {
......
......@@ -63,7 +63,7 @@ uint64_t ChunkBuffer::read(char* buf, uint64_t len) {
// s3_import() every time calls ChunkBuffer->Read() only once, otherwise(as we did in
// downstreamReader->read() for decompression feature before), first call sets buffer to
// ReadyToFill, second call hangs.
CHECK_OR_DIE_MSG(!QueryCancelPending, "%s", "ChunkBuffer reading is interrupted by GPDB");
CHECK_OR_DIE_MSG(!QueryCancelPending, "%s", "ChunkBuffer reading is interrupted by user");
pthread_mutex_lock(&this->statusMutex);
while (this->status != ReadyToRead) {
......@@ -159,10 +159,10 @@ void* DownloadThreadFunc(void* data) {
S3DEBUG("Downloading thread starts");
do {
if (QueryCancelPending) {
S3INFO("Downloading thread is interrupted by GPDB");
S3INFO("Downloading thread is interrupted by user");
// error is shared between all chunks, so all chunks will stop.
buffer->setSharedError(true, "Downloading thread is interrupted by GPDB");
buffer->setSharedError(true, "Downloading thread is interrupted by user");
// have to unlock ChunkBuffer::read in some certain conditions, for instance, status is
// not ReadyToRead, and read() is waiting for signal stat_cond.
......
......@@ -23,6 +23,7 @@ void S3KeyWriter::open(const WriterParams &params) {
// errors.
uint64_t S3KeyWriter::write(char *buf, uint64_t count) {
CHECK_OR_DIE(buf != NULL);
this->checkQueryCancel();
// GPDB issues 64K- block every time and chunkSize is 8MB+
if (count > this->chunkSize) {
......@@ -41,11 +42,22 @@ uint64_t S3KeyWriter::write(char *buf, uint64_t count) {
// This should be reentrant, has no side effects when called multiple times.
void S3KeyWriter::close() {
this->checkQueryCancel();
if (!this->uploadId.empty()) {
this->completeKeyWriting();
}
}
void S3KeyWriter::checkQueryCancel() {
if (QueryCancelPending && !this->uploadId.empty()) {
this->s3interface->abortUpload(this->url, this->region, this->cred, this->uploadId);
this->etagList.clear();
this->uploadId.clear();
CHECK_OR_DIE_MSG(false, "%s", "Upload is interrupted by user");
}
}
void S3KeyWriter::flushBuffer() {
if (!this->buffer.empty()) {
string etag = this->s3interface->uploadPartOfData(
......@@ -54,6 +66,12 @@ void S3KeyWriter::flushBuffer() {
etagList.push_back(etag);
this->buffer.clear();
// most time query is canceled during uploadPartOfData,
// this is the first chance to cancel and clean up upload.
// Otherwise GPDB will call with LAST_CALL but QueryCancelPending is set to false.
// and we can't detect query cancel in S3KeyWriter::close.
this->checkQueryCancel();
}
}
......
......@@ -39,6 +39,15 @@ size_t RESTfulServiceWriteFuncCallback(char *ptr, size_t size, size_t nmemb, voi
return realsize;
}
// curl's write function callback, used only by DELETE request when query is canceled.
// It shouldn't be interrupted by QueryCancelPending.
size_t RESTfulServiceAbortFuncCallback(char *ptr, size_t size, size_t nmemb, void *userp) {
size_t realsize = size * nmemb;
Response *resp = (Response *)userp;
resp->appendDataBuffer(ptr, realsize);
return realsize;
}
// curl's headers write function callback.
size_t RESTfulServiceHeadersWriteFuncCallback(char *ptr, size_t size, size_t nmemb, void *userp) {
if (QueryCancelPending) {
......@@ -186,9 +195,16 @@ Response S3RESTfulService::put(const string &url, HTTPHeaders &headers,
if (res != CURLE_OK) {
S3ERROR("curl_easy_perform() failed: %s", curl_easy_strerror(res));
response.clearBuffers();
if (QueryCancelPending) {
response.setStatus(RESPONSE_ABORT);
response.setMessage("Query cancelled by user");
} else {
response.setStatus(RESPONSE_FAIL);
response.setMessage(string("Server connection failed: ").append(curl_easy_strerror(res)));
response.setMessage(
string("Server connection failed: ").append(curl_easy_strerror(res)));
}
response.clearBuffers();
} else {
long responseCode;
......@@ -257,9 +273,16 @@ Response S3RESTfulService::post(const string &url, HTTPHeaders &headers,
if (res != CURLE_OK) {
S3ERROR("curl_easy_perform() failed: %s", curl_easy_strerror(res));
response.clearBuffers();
if (QueryCancelPending) {
response.setStatus(RESPONSE_ABORT);
response.setMessage("Query cancelled by user");
} else {
response.setStatus(RESPONSE_FAIL);
response.setMessage(string("Server connection failed: ").append(curl_easy_strerror(res)));
response.setMessage(
string("Server connection failed: ").append(curl_easy_strerror(res)));
}
response.clearBuffers();
} else {
long responseCode;
......@@ -331,3 +354,77 @@ ResponseCode S3RESTfulService::head(const string &url, HTTPHeaders &headers,
return responseCode;
}
Response S3RESTfulService::deleteRequest(const string &url, HTTPHeaders &headers,
const map<string, string> &params) {
Response response;
CURL *curl = curl_easy_init();
CHECK_OR_DIE_MSG(curl != NULL, "%s", "Failed to create curl handler");
headers.CreateList();
/* options for downloading */
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_FORBID_REUSE, 1L);
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers.GetList());
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&response);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, RESTfulServiceAbortFuncCallback);
curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE");
vector<uint8_t> data;
UploadData uploadData(data);
curl_easy_setopt(curl, CURLOPT_READDATA, (void *)&uploadData);
curl_easy_setopt(curl, CURLOPT_READFUNCTION, RESTfulServiceReadFuncCallback);
curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, (curl_off_t)data.size());
// consider low speed as timeout
curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, s3ext_low_speed_limit);
curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, s3ext_low_speed_time);
map<string, string>::const_iterator iter = params.find("debug");
if (iter != params.end() && iter->second == "true") {
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
}
if (s3ext_debug_curl) {
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
}
CURLcode res = curl_easy_perform(curl);
if (res != CURLE_OK) {
S3ERROR("curl_easy_perform() failed: %s", curl_easy_strerror(res));
response.clearBuffers();
response.setStatus(RESPONSE_FAIL);
response.setMessage(string("Server connection failed: ").append(curl_easy_strerror(res)));
} else {
long responseCode;
// Get the HTTP response status code from HTTP header
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &responseCode);
// 2XX are successful response. Here we deal with 200 (OK) 204 (no content), and 206
// (partial content)
// firstly.
if (isSuccessfulResponse(responseCode)) {
response.setStatus(RESPONSE_OK);
response.setMessage("Success");
} else { // Server error, set status to RESPONSE_ERROR
stringstream sstr;
sstr << "S3 server returned error, error code is " << responseCode;
response.setStatus(RESPONSE_ERROR);
response.setMessage(sstr.str());
}
}
curl_easy_cleanup(curl);
headers.FreeList();
return response;
}
......@@ -38,6 +38,10 @@ class MockS3Interface : public S3Interface {
MOCK_METHOD5(completeMultiPart, bool(const string& keyUrl, const string& region, const S3Credential& cred,
const string& uploadId, const vector<string>& etagArray));
MOCK_METHOD4(abortUpload, bool(const string &keyUrl, const string &region, const S3Credential &cred,
const string &uploadId));
};
class MockS3RESTfulService : public S3RESTfulService {
......@@ -53,6 +57,9 @@ class MockS3RESTfulService : public S3RESTfulService {
MOCK_METHOD4(post, Response(const string &url, HTTPHeaders &headers,
const map<string, string> &params,const vector<uint8_t> &data));
MOCK_METHOD3(deleteRequest, Response(const string &url, HTTPHeaders &headers,
const map<string, string> &params));
};
class XMLGenerator {
......
......@@ -667,16 +667,24 @@ TEST_F(S3ServiceTest, uploadPartOfDataErrorResponse) {
std::runtime_error);
}
TEST_F(S3ServiceTest, uploadPartOfDataAbortResponse) {
vector<uint8_t> raw;
raw.resize(100);
Response response(RESPONSE_ABORT, raw);
EXPECT_CALL(mockRestfulService, put(_, _, _, _)).WillRepeatedly(Return(response));
EXPECT_EQ("", this->uploadPartOfData(
raw, "https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever", region,
cred, 11, "xyz"));
}
TEST_F(S3ServiceTest, completeMultiPartRoutine) {
vector<uint8_t> raw;
raw.resize(100);
Response response(RESPONSE_OK, raw);
EXPECT_CALL(mockRestfulService, post(_, _, _, _)).WillOnce(Return(response));
vector<string> etagArray;
etagArray.push_back("\"abc\"");
etagArray.push_back("\"def\"");
vector<string> etagArray = {"\"abc\"", "\"def\""};
EXPECT_TRUE(
this->completeMultiPart("https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever",
......@@ -689,10 +697,7 @@ TEST_F(S3ServiceTest, completeMultiPartFailedResponse) {
Response response(RESPONSE_FAIL, raw);
EXPECT_CALL(mockRestfulService, post(_, _, _, _)).WillRepeatedly(Return(response));
vector<string> etagArray;
etagArray.push_back("\"abc\"");
etagArray.push_back("\"def\"");
vector<string> etagArray = {"\"abc\"", "\"def\""};
EXPECT_THROW(
this->completeMultiPart("https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever",
......@@ -718,13 +723,67 @@ TEST_F(S3ServiceTest, completeMultiPartErrorResponse) {
EXPECT_CALL(mockRestfulService, post(_, _, _, _)).WillRepeatedly(Return(response));
vector<string> etagArray;
etagArray.push_back("\"abc\"");
etagArray.push_back("\"def\"");
vector<string> etagArray = {"\"abc\"", "\"def\""};
EXPECT_THROW(
this->completeMultiPart("https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever",
region, cred, "xyz", etagArray),
std::runtime_error);
}
TEST_F(S3ServiceTest, completeMultiPartAbortResponse) {
vector<uint8_t> raw;
raw.resize(100);
Response response(RESPONSE_ABORT, raw);
EXPECT_CALL(mockRestfulService, post(_, _, _, _)).WillRepeatedly(Return(response));
vector<string> etagArray = {"\"abc\"", "\"def\""};
EXPECT_FALSE(
this->completeMultiPart("https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever",
region, cred, "xyz", etagArray));
}
TEST_F(S3ServiceTest, abortUploadRoutine) {
vector<uint8_t> raw;
raw.resize(100);
Response response(RESPONSE_OK, raw);
EXPECT_CALL(mockRestfulService, deleteRequest(_, _, _)).WillOnce(Return(response));
EXPECT_TRUE(this->abortUpload("https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever",
region, cred, "xyz"));
}
TEST_F(S3ServiceTest, abortUploadFailedResponse) {
vector<uint8_t> raw;
raw.resize(100);
Response response(RESPONSE_FAIL, raw);
EXPECT_CALL(mockRestfulService, deleteRequest(_, _, _)).WillRepeatedly(Return(response));
EXPECT_THROW(this->abortUpload("https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever",
region, cred, "xyz"),
std::runtime_error);
}
TEST_F(S3ServiceTest, abortUploadErrorResponse) {
uint8_t xml[] =
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
"<Error>"
"<Code>PermanentRedirect</Code>"
"<Message>The bucket you are attempting to access must be addressed "
"using the specified endpoint. "
"Please send all future requests to this endpoint.</Message>"
"<Bucket>foo</Bucket><Endpoint>s3.amazonaws.com</Endpoint>"
"<RequestId>27DD9B7004AF83E3</RequestId>"
"<HostId>NL3pyGvn+FajhQLKz/"
"hXUzV1VnFbbwNjUQsqWeFiDANkV4EVkh8Kpq5NNAi27P7XDhoA9M9Xhg0=</HostId>"
"</Error>";
vector<uint8_t> raw(xml, xml + sizeof(xml) - 1);
Response response(RESPONSE_ERROR, raw);
EXPECT_CALL(mockRestfulService, deleteRequest(_, _, _)).WillRepeatedly(Return(response));
EXPECT_THROW(this->abortUpload("https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever",
region, cred, "xyz"),
std::runtime_error);
}
......@@ -150,3 +150,39 @@ TEST_F(S3KeyWriterTest, TestUploadContent) {
// Buffer is not empty, close() will upload remaining data in buffer.
this->close();
}
TEST_F(S3KeyWriterTest, TestWriteAbortInWriting) {
testParams.setChunkSize(0x100);
char data[0x100];
EXPECT_CALL(this->mocks3interface, getUploadId(_, _, _)).WillOnce(Return("uploadid1"));
EXPECT_CALL(this->mocks3interface, abortUpload(_, _, _, _)).WillOnce(Return(true));
this->open(testParams);
QueryCancelPending = true;
EXPECT_THROW(this->write(data, 0x80), std::runtime_error);
QueryCancelPending = false;
}
TEST_F(S3KeyWriterTest, TestWriteAbortInClosing) {
testParams.setChunkSize(0x100);
char data[0x100];
EXPECT_CALL(this->mocks3interface, getUploadId(_, _, _)).WillOnce(Return("uploadid1"));
EXPECT_CALL(this->mocks3interface, uploadPartOfData(_, _, _, _, 1, "uploadid1"))
.WillOnce(Invoke(MockUploadPartOfData(0x80)));
EXPECT_CALL(this->mocks3interface, abortUpload(_, _, _, _)).WillOnce(Return(true));
this->open(testParams);
ASSERT_EQ(0x80, this->write(data, 0x80));
// Buffer have little space, will uploadData and clear buffer.
ASSERT_EQ(0x81, this->write(data, 0x81));
QueryCancelPending = true;
// Buffer is not empty, close() will upload remaining data in buffer.
EXPECT_THROW(this->close(), std::runtime_error);
QueryCancelPending = false;
}
\ No newline at end of file
......@@ -348,3 +348,19 @@ TEST(S3RESTfulService, DISABLED_PostToDummyServerWithData) {
EXPECT_EQ(RESPONSE_OK, resp.getStatus());
EXPECT_TRUE(compareVector(data, resp.getRawData()));
}
/* Run './bin/dummyHTTPServer.py' before enabling this test */
TEST(S3RESTfulService, DISABLED_DeleteToDummyServerWithData) {
HTTPHeaders headers;
map<string, string> params;
string url;
S3RESTfulService service;
headers.Add(CONTENTTYPE, "text/plain");
headers.Add(CONTENTLENGTH, "0");
url = "http://localhost:8553";
Response resp = service.deleteRequest(url, headers, params);
EXPECT_EQ(RESPONSE_OK, resp.getStatus());
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册