提交 9c3e3125 编写于 作者: A Adam Lee

s3ext: work through multipart uploading

Signed-off-by: NPeifeng Qiu <pqiu@pivotal.io>
上级 9dcc7643
......@@ -128,8 +128,7 @@ class RESTfulService {
const vector<uint8_t>& data) = 0;
virtual Response post(const string& url, HTTPHeaders& headers,
const map<string, string>& params, const string& queryString,
const vector<uint8_t>& data) = 0;
const map<string, string>& params, const vector<uint8_t>& data) = 0;
virtual ResponseCode head(const string& url, HTTPHeaders& headers,
const map<string, string>& params) = 0;
......
......@@ -2,6 +2,7 @@
#define __S3_HTTP_HEADERS_H__
#include <map>
#include <set>
#include <string>
#include <curl/curl.h>
......@@ -33,6 +34,7 @@ class HTTPHeaders {
~HTTPHeaders();
bool Add(HeaderField f, const string& value);
void Disable(HeaderField f);
const char* Get(HeaderField f);
void CreateList();
......@@ -41,7 +43,9 @@ class HTTPHeaders {
private:
struct curl_slist* header_list;
std::map<HeaderField, string> fields;
std::set<HeaderField> disabledFields;
};
const char* GetFieldString(HeaderField f);
......
......@@ -72,8 +72,8 @@ class S3Interface {
throw std::runtime_error("Default implementation must not be called.");
}
virtual uint64_t uploadData(vector<uint8_t>& data, const string& sourceUrl,
const string& region, const S3Credential& cred) {
virtual uint64_t uploadData(vector<uint8_t>& data, const string& keyUrl, const string& region,
const S3Credential& cred) {
throw std::runtime_error("Default implementation must not be called.");
}
......@@ -86,6 +86,23 @@ class S3Interface {
const S3Credential& cred) {
throw std::runtime_error("Default implementation must not be called.");
}
virtual string getUploadId(const string& keyUrl, const string& region,
const S3Credential& cred) {
throw std::runtime_error("Default implementation must not be called.");
}
virtual string uploadPartOfData(vector<uint8_t>& data, const string& keyUrl,
const string& region, const S3Credential& cred,
uint64_t partNumber, const string& uploadId) {
throw std::runtime_error("Default implementation must not be called.");
}
virtual bool completeMultiPart(const string& keyUrl, const string& region,
const S3Credential& cred, const string& uploadId,
const vector<string>& etagArray) {
throw std::runtime_error("Default implementation must not be called.");
}
};
class S3Service : public S3Interface {
......@@ -98,7 +115,7 @@ class S3Service : public S3Interface {
uint64_t fetchData(uint64_t offset, vector<uint8_t>& data, uint64_t len,
const string& sourceUrl, const string& region, const S3Credential& cred);
uint64_t uploadData(vector<uint8_t>& data, const string& sourceUrl, const string& region,
uint64_t uploadData(vector<uint8_t>& data, const string& keyUrl, const string& region,
const S3Credential& cred);
S3CompressionType checkCompressionType(const string& keyUrl, const string& region,
......@@ -106,7 +123,6 @@ class S3Service : public S3Interface {
bool checkKeyExistence(const string& keyUrl, const string& region, const S3Credential& cred);
// following two functions are exposed publicly for UT tests
void setRESTfulService(RESTfulService* restfullService) {
this->restfulService = restfullService;
}
......@@ -121,8 +137,7 @@ class S3Service : public S3Interface {
uint64_t retries = S3_REQUEST_MAX_RETRIES);
Response postResponseWithRetries(const string& url, HTTPHeaders& headers,
const map<string, string>& params, const string& queryString,
const vector<uint8_t>& data,
const map<string, string>& params, const vector<uint8_t>& data,
uint64_t retries = S3_REQUEST_MAX_RETRIES);
ResponseCode headResponseWithRetries(const string& url, HTTPHeaders& headers,
......@@ -131,11 +146,11 @@ class S3Service : public S3Interface {
string getUploadId(const string& keyUrl, const string& region, const S3Credential& cred);
string uploadPartOfData(vector<uint8_t>& data, const string& sourceUrl, const string& region,
string uploadPartOfData(vector<uint8_t>& data, const string& keyUrl, const string& region,
const S3Credential& cred, uint64_t partNumber, const string& uploadId);
bool completeMultiPart(const string& keyUrl, const string& region, const S3Credential& cred,
const string& uploadId, vector<string>& eTagArray);
const string& uploadId, const vector<string>& etagArray);
private:
string getUrl(const string& prefix, const string& schema, const string& host,
......
......@@ -38,6 +38,8 @@ class S3KeyWriter : Writer {
}
protected:
void flushBuffer();
WriterBuffer buffer;
S3Interface* s3interface;
......
......@@ -16,7 +16,7 @@ class S3RESTfulService : public RESTfulService {
const vector<uint8_t>& data);
Response post(const string& url, HTTPHeaders& headers, const map<string, string>& params,
const string& queryString, const vector<uint8_t>& data);
const vector<uint8_t>& data);
};
#endif /* INCLUDE_S3RESTFUL_SERVICE_H_ */
......@@ -32,6 +32,10 @@ bool HTTPHeaders::Add(HeaderField f, const string &v) {
}
}
void HTTPHeaders::Disable(HeaderField f) {
this->disabledFields.insert(f);
}
const char *HTTPHeaders::Get(HeaderField f) {
return this->fields[f].empty() ? NULL : this->fields[f].c_str();
}
......@@ -47,6 +51,13 @@ void HTTPHeaders::CreateList() {
headers = curl_slist_append(headers, sstr.str().c_str());
}
std::set<HeaderField>::iterator it2;
for (it2 = this->disabledFields.begin(); it2 != this->disabledFields.end(); it2++) {
std::stringstream sstr;
sstr << GetFieldString(*it2) << ":";
headers = curl_slist_append(headers, sstr.str().c_str());
}
this->header_list = headers;
}
......
......@@ -79,11 +79,10 @@ Response S3Service::putResponseWithRetries(const string &url, HTTPHeaders &heade
Response S3Service::postResponseWithRetries(const string &url, HTTPHeaders &headers,
const map<string, string> &params,
const string &queryString, const vector<uint8_t> &data,
uint64_t retries) {
const vector<uint8_t> &data, uint64_t retries) {
while (retries--) {
// declare response here to leverage RVO (Return Value Optimization)
Response response = this->restfulService->post(url, headers, params, queryString, data);
Response response = this->restfulService->post(url, headers, params, data);
if (response.isSuccess() || (retries == 0)) {
return response;
};
......@@ -395,14 +394,14 @@ uint64_t S3Service::fetchData(uint64_t offset, vector<uint8_t> &data, uint64_t l
parseXMLMessage(xmlContext, "Message").c_str());
}
S3ERROR("Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
S3ERROR("Failed to request: %s, Response message: %s", sourceUrl.c_str(),
resp.getMessage().c_str());
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
resp.getMessage().c_str());
return 0;
} else {
S3ERROR("Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
S3ERROR("Failed to request: %s, Response message: %s", sourceUrl.c_str(),
resp.getMessage().c_str());
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
resp.getMessage().c_str());
......@@ -410,11 +409,11 @@ uint64_t S3Service::fetchData(uint64_t offset, vector<uint8_t> &data, uint64_t l
}
}
uint64_t S3Service::uploadData(vector<uint8_t> &data, const string &sourceUrl, const string &region,
uint64_t S3Service::uploadData(vector<uint8_t> &data, const string &keyUrl, const string &region,
const S3Credential &cred) {
HTTPHeaders headers;
map<string, string> params;
UrlParser parser(sourceUrl);
UrlParser parser(keyUrl);
headers.Add(HOST, parser.getHost());
headers.Add(X_AMZ_CONTENT_SHA256, "UNSIGNED-PAYLOAD");
......@@ -424,7 +423,7 @@ uint64_t S3Service::uploadData(vector<uint8_t> &data, const string &sourceUrl, c
SignRequestV4("PUT", &headers, region, parser.getPath(), "", cred);
Response resp = this->putResponseWithRetries(sourceUrl, headers, params, data);
Response resp = this->putResponseWithRetries(keyUrl, headers, params, data);
if (resp.getStatus() == RESPONSE_OK) {
return data.size();
} else if (resp.getStatus() == RESPONSE_ERROR) {
......@@ -435,16 +434,16 @@ uint64_t S3Service::uploadData(vector<uint8_t> &data, const string &sourceUrl, c
parseXMLMessage(xmlContext, "Message").c_str());
}
S3ERROR("Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
S3ERROR("Failed to request: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
CHECK_OR_DIE_MSG(false, "Failed to request: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
return 0;
} else {
S3ERROR("Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
S3ERROR("Failed to request: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
CHECK_OR_DIE_MSG(false, "Failed to request: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
return 0;
}
......@@ -487,7 +486,7 @@ S3CompressionType S3Service::checkCompressionType(const string &keyUrl, const st
parseXMLMessage(xmlContext, "Message").c_str());
}
}
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", keyUrl.c_str(),
CHECK_OR_DIE_MSG(false, "Failed to request: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
}
......@@ -513,18 +512,18 @@ string S3Service::getUploadId(const string &keyUrl, const string &region,
HTTPHeaders headers;
map<string, string> params;
UrlParser parser(keyUrl);
stringstream pathWithQuery;
headers.Add(HOST, parser.getHost());
headers.Add(CONTENTTYPE, "application/x-www-form-urlencoded");
headers.Disable(CONTENTTYPE);
headers.Disable(CONTENTLENGTH);
headers.Add(X_AMZ_CONTENT_SHA256, "UNSIGNED-PAYLOAD");
pathWithQuery << parser.getPath() << "?uploads";
SignRequestV4("POST", &headers, region, parser.getPath(), "uploads=", cred);
SignRequestV4("POST", &headers, region, pathWithQuery.str(), "", cred);
stringstream urlWithQuery;
urlWithQuery << keyUrl << "?uploads";
Response resp =
this->postResponseWithRetries(keyUrl, headers, params, "uploads", vector<uint8_t>());
Response resp = this->postResponseWithRetries(urlWithQuery.str(), headers, params, vector<uint8_t>());
if (resp.getStatus() == RESPONSE_OK) {
xmlParserCtxtPtr xmlContext = getXMLContext(resp);
if (xmlContext != NULL) {
......@@ -539,14 +538,14 @@ string S3Service::getUploadId(const string &keyUrl, const string &region,
parseXMLMessage(xmlContext, "Message").c_str());
}
S3ERROR("Failed to fetch: %s, Response message: %s", keyUrl.c_str(),
S3ERROR("Failed to request: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
return "";
} else {
S3ERROR("Failed to fetch: %s, Response message: %s", keyUrl.c_str(),
S3ERROR("Failed to request: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
......@@ -556,13 +555,13 @@ string S3Service::getUploadId(const string &keyUrl, const string &region,
return "";
}
string S3Service::uploadPartOfData(vector<uint8_t> &data, const string &sourceUrl,
string S3Service::uploadPartOfData(vector<uint8_t> &data, const string &keyUrl,
const string &region, const S3Credential &cred,
uint64_t partNumber, const string &uploadId) {
HTTPHeaders headers;
map<string, string> params;
UrlParser parser(sourceUrl);
stringstream pathWithQuery;
UrlParser parser(keyUrl);
stringstream queryString;
headers.Add(HOST, parser.getHost());
headers.Add(X_AMZ_CONTENT_SHA256, "UNSIGNED-PAYLOAD");
......@@ -570,12 +569,12 @@ string S3Service::uploadPartOfData(vector<uint8_t> &data, const string &sourceUr
headers.Add(CONTENTTYPE, "text/plain");
headers.Add(CONTENTLENGTH, std::to_string((unsigned long long)data.size()));
pathWithQuery << parser.getPath() << "?partNumber=" << partNumber << "&uploadId=" << uploadId;
queryString << "partNumber=" << partNumber << "&uploadId=" << uploadId;
SignRequestV4("PUT", &headers, region, pathWithQuery.str(), "", cred);
SignRequestV4("PUT", &headers, region, parser.getPath(), queryString.str(), cred);
stringstream urlWithQuery;
urlWithQuery << sourceUrl << "?partNumber=" << partNumber << "&uploadId=" << uploadId;
urlWithQuery << keyUrl << "?partNumber=" << partNumber << "&uploadId=" << uploadId;
Response resp = this->putResponseWithRetries(urlWithQuery.str(), headers, params, data);
if (resp.getStatus() == RESPONSE_OK) {
......@@ -596,16 +595,16 @@ string S3Service::uploadPartOfData(vector<uint8_t> &data, const string &sourceUr
parseXMLMessage(xmlContext, "Message").c_str());
}
S3ERROR("Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
S3ERROR("Failed to request: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
return "";
} else {
S3ERROR("Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
S3ERROR("Failed to request: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
return "";
}
......@@ -613,11 +612,11 @@ string S3Service::uploadPartOfData(vector<uint8_t> &data, const string &sourceUr
bool S3Service::completeMultiPart(const string &keyUrl, const string &region,
const S3Credential &cred, const string &uploadId,
vector<string> &etagArray) {
const vector<string> &etagArray) {
HTTPHeaders headers;
map<string, string> params;
UrlParser parser(keyUrl);
stringstream pathWithQuery;
stringstream queryString;
stringstream body;
......@@ -633,17 +632,17 @@ bool S3Service::completeMultiPart(const string &keyUrl, const string &region,
headers.Add(X_AMZ_CONTENT_SHA256, "UNSIGNED-PAYLOAD");
headers.Add(CONTENTLENGTH, std::to_string((unsigned long long)body.str().length()));
pathWithQuery << parser.getPath() << "?uploadId" << uploadId;
queryString << "uploadId=" << uploadId;
SignRequestV4("POST", &headers, region, pathWithQuery.str(), "", cred);
SignRequestV4("POST", &headers, region, parser.getPath(), queryString.str(), cred);
stringstream urlWithQuery;
urlWithQuery << keyUrl << "?uploadId" << uploadId;
urlWithQuery << keyUrl << "?uploadId=" << uploadId;
string bodyString = body.str();
Response resp =
this->postResponseWithRetries(urlWithQuery.str(), headers, params, "",
vector<uint8_t>(bodyString.begin(), bodyString.end()));
Response resp = this->postResponseWithRetries(
urlWithQuery.str(), headers, params, vector<uint8_t>(bodyString.begin(), bodyString.end()));
if (resp.getStatus() == RESPONSE_OK) {
return true;
} else if (resp.getStatus() == RESPONSE_ERROR) {
......@@ -654,16 +653,16 @@ bool S3Service::completeMultiPart(const string &keyUrl, const string &region,
parseXMLMessage(xmlContext, "Message").c_str());
}
S3ERROR("Failed to fetch: %s, Response message: %s", keyUrl.c_str(),
S3ERROR("Failed to request: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", keyUrl.c_str(),
CHECK_OR_DIE_MSG(false, "Failed to request: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
return false;
} else {
S3ERROR("Failed to fetch: %s, Response message: %s", keyUrl.c_str(),
S3ERROR("Failed to request: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", keyUrl.c_str(),
CHECK_OR_DIE_MSG(false, "Failed to request: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
return false;
}
......
......@@ -27,8 +27,7 @@ uint64_t S3KeyWriter::write(char *buf, uint64_t count) {
}
if (this->buffer.size() + count > this->chunkSize) {
this->s3interface->uploadData(this->buffer, this->url, this->region, this->cred);
this->buffer.clear();
flushBuffer();
}
this->buffer.insert(this->buffer.end(), buf, buf + count);
......@@ -38,8 +37,17 @@ uint64_t S3KeyWriter::write(char *buf, uint64_t count) {
// This should be reentrant, has no side effects when called multiple times.
void S3KeyWriter::close() {
flushBuffer();
}
void S3KeyWriter::flushBuffer() {
if (!this->buffer.empty()) {
this->s3interface->uploadData(this->buffer, this->url, this->region, this->cred);
string id = this->s3interface->getUploadId(this->url, this->region, this->cred);
string etag =
this->s3interface->uploadPartOfData(buffer, this->url, this->region, this->cred, 1, id);
this->s3interface->completeMultiPart(this->url, this->region, this->cred, id, {etag});
this->buffer.clear();
}
}
......@@ -219,8 +219,7 @@ Response S3RESTfulService::put(const string &url, HTTPHeaders &headers,
}
Response S3RESTfulService::post(const string &url, HTTPHeaders &headers,
const map<string, string> &params, const string &queryString,
const vector<uint8_t> &data) {
const map<string, string> &params, const vector<uint8_t> &data) {
Response response;
CURL *curl = curl_easy_init();
......@@ -237,15 +236,12 @@ Response S3RESTfulService::post(const string &url, HTTPHeaders &headers,
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, RESTfulServiceWriteFuncCallback);
curl_easy_setopt(curl, CURLOPT_POST, 1L);
if (!queryString.empty()) {
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, queryString.c_str());
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, (long)queryString.length());
}
if (!data.empty()) {
UploadData uploadData(data);
curl_easy_setopt(curl, CURLOPT_READDATA, (void *)&uploadData);
curl_easy_setopt(curl, CURLOPT_READFUNCTION, RESTfulServiceReadFuncCallback);
/* CURLOPT_INFILESIZE_LARGE for sending files larger than 2GB.*/
curl_easy_setopt(curl, CURLOPT_INFILESIZE_LARGE, (curl_off_t)data.size());
}
......
......@@ -22,7 +22,7 @@ class MockS3Interface : public S3Interface {
uint64_t(uint64_t offset, vector<uint8_t>& data, uint64_t len, const string &sourceUrl,
const string &region, const S3Credential &cred));
MOCK_METHOD4(uploadData, uint64_t(vector<uint8_t>& data, const string& sourceUrl,
MOCK_METHOD4(uploadData, uint64_t(vector<uint8_t>& data, const string& keyUrl,
const string& region, const S3Credential& cred));
MOCK_METHOD3(checkCompressionType, S3CompressionType(const string& keyUrl, const string& region,
......@@ -30,6 +30,14 @@ class MockS3Interface : public S3Interface {
MOCK_METHOD3(checkKeyExistence, bool(const string& keyUrl, const string& region,
const S3Credential& cred));
MOCK_METHOD3(getUploadId, string(const string& keyUrl, const string& region, const S3Credential& cred));
MOCK_METHOD6(uploadPartOfData, string(vector<uint8_t>& data, const string& keyUrl, const string& region,
const S3Credential& cred, uint64_t partNumber, const string& uploadId));
MOCK_METHOD5(completeMultiPart, bool(const string& keyUrl, const string& region, const S3Credential& cred,
const string& uploadId, const vector<string>& etagArray));
};
class MockS3RESTfulService : public S3RESTfulService {
......@@ -43,9 +51,8 @@ class MockS3RESTfulService : public S3RESTfulService {
MOCK_METHOD4(put, Response(const string &url, HTTPHeaders &headers,
const map<string, string> &params, const vector<uint8_t> &data));
MOCK_METHOD5(post, Response(const string &url, HTTPHeaders &headers,
const map<string, string> &params, const string &queryString,
const vector<uint8_t> &data));
MOCK_METHOD4(post, Response(const string &url, HTTPHeaders &headers,
const map<string, string> &params,const vector<uint8_t> &data));
};
class XMLGenerator {
......
......@@ -21,27 +21,53 @@ TEST(Common, HTTPHeaders) {
#define HOSTSTR "www.google.com"
#define RANGESTR "1-10000"
#define MD5STR "xxxxxxxxxxxxxxxxxxx"
HTTPHeaders *h = new HTTPHeaders();
ASSERT_NE((void *)NULL, h);
HTTPHeaders headers;
h->CreateList();
curl_slist *l = h->GetList();
EXPECT_EQ((void *)NULL, l);
h->FreeList();
headers.CreateList();
curl_slist *headersList = headers.GetList();
EXPECT_EQ((void *)NULL, headersList);
headers.FreeList();
ASSERT_FALSE(h->Add(HOST, ""));
ASSERT_TRUE(h->Add(HOST, HOSTSTR));
ASSERT_TRUE(h->Add(RANGE, RANGESTR));
ASSERT_TRUE(h->Add(CONTENTMD5, MD5STR));
ASSERT_FALSE(headers.Add(HOST, ""));
ASSERT_TRUE(headers.Add(HOST, HOSTSTR));
ASSERT_TRUE(headers.Add(RANGE, RANGESTR));
ASSERT_TRUE(headers.Add(CONTENTMD5, MD5STR));
EXPECT_STREQ(HOSTSTR, h->Get(HOST));
EXPECT_STREQ(RANGESTR, h->Get(RANGE));
EXPECT_STREQ(MD5STR, h->Get(CONTENTMD5));
EXPECT_STREQ(HOSTSTR, headers.Get(HOST));
EXPECT_STREQ(RANGESTR, headers.Get(RANGE));
EXPECT_STREQ(MD5STR, headers.Get(CONTENTMD5));
h->CreateList();
l = h->GetList();
ASSERT_NE((void *)NULL, l);
h->FreeList();
headers.CreateList();
headersList = headers.GetList();
ASSERT_NE((void *)NULL, headersList);
delete h;
EXPECT_STREQ(headersList->data, "Host: www.google.com");
headersList = headersList->next;
EXPECT_STREQ(headersList->data, "Range: 1-10000");
headersList = headersList->next;
EXPECT_STREQ(headersList->data, "Content-MD5: xxxxxxxxxxxxxxxxxxx");
headersList = headersList->next;
EXPECT_EQ((void *)NULL, headersList);
headers.FreeList();
}
TEST(Common, HTTPHeadersDisable) {
HTTPHeaders headers;
headers.Disable(CONTENTLENGTH);
headers.CreateList();
curl_slist *headersList = headers.GetList();
ASSERT_NE((void *)NULL, headersList);
EXPECT_STREQ(headersList->data, "Content-Length:");
headersList = headersList->next;
EXPECT_EQ((void *)NULL, headersList);
headers.FreeList();
}
......@@ -182,7 +182,7 @@ TEST_F(S3ServiceTest, PostResponseWithZeroRetry) {
vector<uint8_t> data;
EXPECT_EQ(RESPONSE_FAIL,
this->postResponseWithRetries(url, headers, params, "", data, 0).getStatus());
this->postResponseWithRetries(url, headers, params, data, 0).getStatus());
}
TEST_F(S3ServiceTest, PostResponseWithTwoRetries) {
......@@ -191,13 +191,13 @@ TEST_F(S3ServiceTest, PostResponseWithTwoRetries) {
map<string, string> params;
vector<uint8_t> data;
EXPECT_CALL(mockRestfulService, post(_, _, _, _, _))
EXPECT_CALL(mockRestfulService, post(_, _, _, _))
.Times(2)
.WillOnce(Return(response))
.WillOnce(Return(response));
EXPECT_EQ(RESPONSE_FAIL,
this->postResponseWithRetries(url, headers, params, "", data, 2).getStatus());
this->postResponseWithRetries(url, headers, params, data, 2).getStatus());
}
TEST_F(S3ServiceTest, PostResponseWithRetriesAndSuccess) {
......@@ -209,13 +209,12 @@ TEST_F(S3ServiceTest, PostResponseWithRetriesAndSuccess) {
Response responseSuccess;
responseSuccess.setStatus(RESPONSE_OK);
EXPECT_CALL(mockRestfulService, post(_, _, _, _, _))
EXPECT_CALL(mockRestfulService, post(_, _, _, _))
.Times(2)
.WillOnce(Return(response))
.WillOnce(Return(responseSuccess));
EXPECT_EQ(RESPONSE_OK,
this->postResponseWithRetries(url, headers, params, "", data).getStatus());
EXPECT_EQ(RESPONSE_OK, this->postResponseWithRetries(url, headers, params, data).getStatus());
}
TEST_F(S3ServiceTest, ListBucketThrowExceptionWhenBucketStringIsEmpty) {
......@@ -630,8 +629,7 @@ TEST_F(S3ServiceTest, getUploadIdRoutine) {
vector<uint8_t> raw(xml, xml + sizeof(xml) - 1);
Response response(RESPONSE_OK, raw);
EXPECT_CALL(mockRestfulService, post(_, _, _, "uploads", vector<uint8_t>()))
.WillOnce(Return(response));
EXPECT_CALL(mockRestfulService, post(_, _, _, vector<uint8_t>())).WillOnce(Return(response));
EXPECT_EQ("VXBsb2FkIElEIGZvciA2aWWpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZA",
this->getUploadId("https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever",
......@@ -643,7 +641,7 @@ TEST_F(S3ServiceTest, getUploadIdFailedResponse) {
raw.resize(100);
Response response(RESPONSE_FAIL, raw);
EXPECT_CALL(mockRestfulService, post(_, _, _, "uploads", vector<uint8_t>()))
EXPECT_CALL(mockRestfulService, post(_, _, _, vector<uint8_t>()))
.WillRepeatedly(Return(response));
EXPECT_THROW(this->getUploadId("https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever",
......@@ -667,7 +665,7 @@ TEST_F(S3ServiceTest, getUploadIdErrorResponse) {
vector<uint8_t> raw(xml, xml + sizeof(xml) - 1);
Response response(RESPONSE_ERROR, raw);
EXPECT_CALL(mockRestfulService, post(_, _, _, "uploads", vector<uint8_t>()))
EXPECT_CALL(mockRestfulService, post(_, _, _, vector<uint8_t>()))
.WillRepeatedly(Return(response));
EXPECT_THROW(this->getUploadId("https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever",
......@@ -739,7 +737,7 @@ TEST_F(S3ServiceTest, completeMultiPartRoutine) {
vector<uint8_t> raw;
raw.resize(100);
Response response(RESPONSE_OK, raw);
EXPECT_CALL(mockRestfulService, post(_, _, _, "", _)).WillOnce(Return(response));
EXPECT_CALL(mockRestfulService, post(_, _, _, _)).WillOnce(Return(response));
vector<string> etagArray;
......@@ -755,7 +753,7 @@ TEST_F(S3ServiceTest, completeMultiPartFailedResponse) {
vector<uint8_t> raw;
raw.resize(100);
Response response(RESPONSE_FAIL, raw);
EXPECT_CALL(mockRestfulService, post(_, _, _, "", _)).WillRepeatedly(Return(response));
EXPECT_CALL(mockRestfulService, post(_, _, _, _)).WillRepeatedly(Return(response));
vector<string> etagArray;
......@@ -781,7 +779,7 @@ TEST_F(S3ServiceTest, completeMultiPartErrorResponse) {
vector<uint8_t> raw(xml, xml + sizeof(xml) - 1);
Response response(RESPONSE_ERROR, raw);
EXPECT_CALL(mockRestfulService, post(_, _, _, "", _)).WillRepeatedly(Return(response));
EXPECT_CALL(mockRestfulService, post(_, _, _, _)).WillRepeatedly(Return(response));
vector<string> etagArray;
......
......@@ -63,7 +63,10 @@ TEST_F(S3KeyWriterTest, TestSmallWrite) {
testParams.setChunkSize(0x100);
char data[0x10];
EXPECT_CALL(this->mocks3interface, uploadData(_, _, _, _)).WillOnce(Return(sizeof(data)));
EXPECT_CALL(this->mocks3interface, getUploadId(_, _, _)).WillOnce(Return("uploadid"));
EXPECT_CALL(this->mocks3interface, uploadPartOfData(_, _, _, _, _, _))
.WillOnce(Return("\"etag\""));
EXPECT_CALL(this->mocks3interface, completeMultiPart(_, _, _, _, _)).WillOnce(Return(true));
this->open(testParams);
ASSERT_EQ(sizeof(data), this->write(data, sizeof(data)));
......@@ -81,13 +84,39 @@ TEST_F(S3KeyWriterTest, TestSmallChunkSize) {
EXPECT_THROW(this->write(data, sizeof(data)), std::runtime_error);
}
class MockUploadPartOfData {
public:
MockUploadPartOfData(uint64_t expectedLen) : expectedLength(expectedLen) {
}
MockUploadPartOfData(const MockUploadPartOfData &other) {
expectedLength = other.expectedLength;
}
string operator()(vector<uint8_t> &data, const string &keyUrl, const string &region,
const S3Credential &cred, uint64_t partNumber, const string &uploadId) {
EXPECT_EQ(data.size(), expectedLength);
return "\"etag\"";
}
private:
uint64_t expectedLength;
};
TEST_F(S3KeyWriterTest, TestBufferedWrite) {
testParams.setChunkSize(0x100);
char data[0x100];
EXPECT_CALL(this->mocks3interface, uploadData(_, _, _, _))
.WillOnce(Return(0x80))
.WillOnce(Return(0x81));
EXPECT_CALL(this->mocks3interface, getUploadId(_, _, _))
.WillOnce(Return("uploadid1"))
.WillOnce(Return("uploadid2"));
EXPECT_CALL(this->mocks3interface, uploadPartOfData(_, _, _, _, 1, "uploadid1"))
.WillOnce(Invoke(MockUploadPartOfData(0x80)));
EXPECT_CALL(this->mocks3interface, uploadPartOfData(_, _, _, _, 1, "uploadid2"))
.WillOnce(Invoke(MockUploadPartOfData(0x81)));
EXPECT_CALL(this->mocks3interface, completeMultiPart(_, _, _, _, _))
.WillOnce(Return(true))
.WillOnce(Return(true));
this->open(testParams);
ASSERT_EQ(0x80, this->write(data, 0x80));
......@@ -103,7 +132,10 @@ TEST_F(S3KeyWriterTest, TestUploadContent) {
testParams.setChunkSize(0x100);
char data[] = "The quick brown fox jumps over the lazy dog";
EXPECT_CALL(this->mocks3interface, uploadData(_, _, _, _)).WillOnce(Return(sizeof(data)));
EXPECT_CALL(this->mocks3interface, getUploadId(_, _, _)).WillOnce(Return("uploadid"));
EXPECT_CALL(this->mocks3interface, uploadPartOfData(_, _, _, _, _, _))
.WillOnce(Return("\"etag\""));
EXPECT_CALL(this->mocks3interface, completeMultiPart(_, _, _, _, _)).WillOnce(Return(true));
this->open(testParams);
ASSERT_EQ(sizeof(data), this->write(data, sizeof(data)));
......
......@@ -240,9 +240,8 @@ TEST(S3RESTfulService, PostWithoutURL) {
map<string, string> params;
string url;
S3RESTfulService service;
string query;
Response resp = service.post(url, headers, params, query, vector<uint8_t>());
Response resp = service.post(url, headers, params, vector<uint8_t>());
EXPECT_EQ(RESPONSE_FAIL, resp.getStatus());
EXPECT_EQ("Failed to talk to s3 service URL using bad/illegal format or missing URL",
......@@ -256,10 +255,10 @@ TEST(S3RESTfulService, PostToServerWithBlindPutServiceAndDebugParam) {
string url;
S3RESTfulService service;
string query = "abcdefghij";
url = "https://www.bing.com";
headers.Add(CONTENTLENGTH, "3");
url = "https://www.bing.com/?abcdefghij";
Response resp = service.post(url, headers, params, query, vector<uint8_t>());
Response resp = service.post(url, headers, params, vector<uint8_t>({1, 2, 3}));
EXPECT_EQ(RESPONSE_OK, resp.getStatus());
}
......@@ -270,10 +269,10 @@ TEST(S3RESTfulService, PostToServerWithBlindPutService) {
string url;
S3RESTfulService service;
string query = "abcdefghij";
url = "https://www.bing.com";
url = "https://www.bing.com/?abcdefghij";
Response resp = service.post(url, headers, params, query, vector<uint8_t>());
headers.Add(CONTENTLENGTH, "3");
Response resp = service.post(url, headers, params, vector<uint8_t>({1, 2, 3}));
EXPECT_EQ(RESPONSE_OK, resp.getStatus());
}
......@@ -284,11 +283,10 @@ TEST(S3RESTfulService, PostToServerWith404Page) {
string url;
S3RESTfulService service;
string query = "abcdefghij";
url = "https://www.bing.com/pivotal.html";
url = "https://www.bing.com/pivotal.html/?abcdefghij";
Response resp = service.post(url, headers, params, query, vector<uint8_t>());
headers.Add(CONTENTLENGTH, "3");
Response resp = service.post(url, headers, params, vector<uint8_t>({1, 2, 3}));
EXPECT_EQ(RESPONSE_ERROR, resp.getStatus());
EXPECT_EQ("S3 server returned error, error code is 404", resp.getMessage());
......@@ -310,7 +308,7 @@ TEST(S3RESTfulService, PostToServerWithData) {
headers.Add(CONTENTTYPE, "text/plain");
headers.Add(CONTENTLENGTH, std::to_string(data.size()));
Response resp = service.post(url, headers, params, "", data);
Response resp = service.post(url, headers, params, data);
EXPECT_EQ(RESPONSE_OK, resp.getStatus());
}
......@@ -322,13 +320,11 @@ TEST(S3RESTfulService, DISABLED_PostToDummyServer) {
string url;
S3RESTfulService service;
string query = "abcdefghij";
url = "http://localhost:8553";
url = "http://localhost:8553/?abcdefghij";
Response resp = service.post(url, headers, params, query, vector<uint8_t>());
Response resp = service.post(url, headers, params, vector<uint8_t>());
EXPECT_EQ(RESPONSE_OK, resp.getStatus());
EXPECT_EQ(query, string(resp.getRawData().begin(), resp.getRawData().end()));
EXPECT_EQ("abcdefghij", string(resp.getRawData().begin(), resp.getRawData().end()));
}
/* Run './bin/dummyHTTPServer.py' before enabling this test */
......@@ -348,7 +344,7 @@ TEST(S3RESTfulService, DISABLED_PostToDummyServerWithData) {
url = "http://localhost:8553";
Response resp = service.post(url, headers, params, "", data);
Response resp = service.post(url, headers, params, data);
EXPECT_EQ(RESPONSE_OK, resp.getStatus());
EXPECT_TRUE(compareVector(data, resp.getRawData()));
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册