提交 524dcd0f 编写于 作者: A Adam Lee

s3ext: refactor s3key_reader codes

Separate key downloading and decompressing functions, make it more
flexible and robust.
Signed-off-by: NAdam Lee <ali@pivotal.io>
Signed-off-by: NPeifeng Qiu <pqiu@pivotal.io>
上级 023134e4
......@@ -16,7 +16,7 @@ endif
# Targets
MODULE_big = gps3ext
OBJS = lib/http_parser.o lib/ini.o src/gps3ext.o src/s3conf.o src/s3common.o src/gpreader.o src/s3utils.o src/s3log.o src/s3url_parser.o src/s3http_headers.o src/s3thread.o src/s3extbase.o src/s3reader.o src/s3interface.o src/s3restful_service.o src/uncompress_reader.o
OBJS = lib/http_parser.o lib/ini.o src/gps3ext.o src/s3conf.o src/s3common.o src/gpreader.o src/s3utils.o src/s3log.o src/s3url_parser.o src/s3http_headers.o src/s3thread.o src/s3extbase.o src/s3interface.o src/s3restful_service.o src/uncompress_reader.o src/s3key_reader.o
# Launch
PGXS := $(shell pg_config --pgxs)
......@@ -34,7 +34,7 @@ tags:
test: format
@make -f Makefile.others test
coverage:
coverage: format
@make -f Makefile.others coverage
.PHONY: format lint tags test coverage
......@@ -16,7 +16,7 @@ endif
# Targets
PROGRAM = gpcheckcloud
OBJS = src/gpcheckcloud.o src/s3conf.o src/gpreader.o src/s3utils.o src/s3log.o src/s3common.o lib/http_parser.o lib/ini.o src/s3url_parser.o src/s3http_headers.o src/s3thread.o src/s3extbase.o src/s3reader.o src/s3interface.o src/s3restful_service.o src/uncompress_reader.o
OBJS = src/gpcheckcloud.o src/s3conf.o src/gpreader.o src/s3utils.o src/s3log.o src/s3common.o lib/http_parser.o lib/ini.o src/s3url_parser.o src/s3http_headers.o src/s3thread.o src/s3extbase.o src/s3interface.o src/s3restful_service.o src/uncompress_reader.o src/s3key_reader.o
# Launch
PGXS := $(shell pg_config --pgxs)
......
# Options
DEBUG_S3_CURL = n
ARCH=$(shell uname -s)
GCOV=gcov
ARCH = $(shell uname -s)
# Flags
CPP = g++
......@@ -11,10 +10,8 @@ CPPFLAGS = -O2 -g3 -std=c++98 -Wall -fPIC -DS3_STANDALONE -fprofile-arcs -ftest-
ifeq "$(ARCH)" "Darwin"
LDFLAGS += -coverage
GCOV += -a
else
LDFLAGS += -lgcov
GCOV += -r
endif
ifeq ($(DEBUG_S3_CURL),y)
......@@ -24,7 +21,7 @@ endif
all: test
# Google TEST
TEST_SRC_FILES = test/s3conf_test.cpp test/s3utils_test.cpp test/gpreader_test.cpp test/s3common_test.cpp test/s3log_test.cpp test/s3url_parser_test.cpp test/s3http_headers_test.cpp test/s3thread_test.cpp test/s3reader_test.cpp test/s3bucket_reader_test.cpp test/s3interface_test.cpp test/s3restful_service_test.cpp test/uncompress_reader_test.cpp
TEST_SRC_FILES = test/s3conf_test.cpp test/s3utils_test.cpp test/s3common_test.cpp test/s3log_test.cpp test/s3url_parser_test.cpp test/s3http_headers_test.cpp test/s3thread_test.cpp test/s3bucket_reader_test.cpp test/s3interface_test.cpp test/s3restful_service_test.cpp test/uncompress_reader_test.cpp test/s3key_reader_test.cpp
TEST_OBJS = $(TEST_SRC_FILES:.cpp=.o)
TEST_APP = s3test
......@@ -38,7 +35,7 @@ $(TEST_OBJS) gtest_main.o gtest-all.o gmock-all.o: INCLUDES += -I$(GTEST_DIR)/ -
gmock-all.o :
$(CXX) $(INCLUDES) $(CXXFLAGS) -c $(GMOCK_DIR)/src/gmock-all.cc
gtest-all.o :
$(CXX) $(INCLUDES) $(CXXFLAGS) -c $(GTEST_DIR)/src/gtest-all.cc
......@@ -62,7 +59,7 @@ test: $(TEST_APP)
-@./$(TEST_APP)
coverage: test
@$(GCOV) $(TEST_SRC_FILES) | grep -v test.cpp | grep -A 2 "src/.*.cpp"
@gcov $(TEST_SRC_FILES) | grep -A 1 "src/.*.cpp"
clean:
rm -f *.gcov src/*.gcov src/*.gcda src/*.gcno
......
......@@ -8,8 +8,8 @@
#include "gpreader.h"
#include "s3common.h"
#include "s3conf.h"
#include "s3key_reader.h"
#include "s3log.h"
#include "s3reader.h"
#include "s3thread.h"
#define BUF_SIZE 64 * 1024
......
......@@ -4,7 +4,8 @@
#include <string>
#include "s3extbase.h"
#include "s3reader.h"
#include "s3interface.h"
#include "s3key_reader.h"
using std::string;
......
......@@ -44,20 +44,20 @@ class ReaderParams {
this->region = region;
}
uint64_t getSize() const {
return size;
uint64_t getKeySize() const {
return keySize;
}
void setSize(uint64_t size) {
this->size = size;
void setKeySize(uint64_t size) {
this->keySize = size;
}
const string& getUrl() const {
return url;
const string& getUrlToLoad() const {
return urlToLoad;
}
void setUrl(const string& url) {
this->url = url;
void setUrlToLoad(const string& url) {
this->urlToLoad = url;
}
int getSegId() const {
......@@ -76,12 +76,21 @@ class ReaderParams {
this->segNum = segNum;
}
uint8_t getNumOfChunks() const {
return numOfChunks;
}
void setNumOfChunks(uint8_t numOfChunks) {
this->numOfChunks = numOfChunks;
}
private:
string url; // original url to read/write.
string keyUrl; // key url in s3 bucket.
string urlToLoad; // original url to read/write.
string keyUrl; // key url in s3 bucket.
string region;
uint64_t size; // key/file size.
uint64_t chunkSize; // chunk size
uint64_t keySize; // key/file size.
uint64_t chunkSize; // chunk size
uint8_t numOfChunks; // number of chunks(threads).
S3Credential cred;
int segId;
int segNum;
......
......@@ -65,11 +65,13 @@ class Response {
class RESTfulService {
public:
RESTfulService();
virtual ~RESTfulService();
RESTfulService() {
}
virtual ~RESTfulService() {
}
virtual Response get(const string& url, HTTPHeaders& headers,
const map<string, string>& params) = 0;
};
#endif /* INCLUDE_RESTFUL_SERVICE_H_ */
\ No newline at end of file
#endif /* INCLUDE_RESTFUL_SERVICE_H_ */
......@@ -5,7 +5,7 @@
#include "reader.h"
#include "s3interface.h"
#include "s3reader.h"
#include "s3key_reader.h"
using std::string;
......@@ -19,7 +19,10 @@ class S3BucketReader : public Reader {
uint64_t read(char *buf, uint64_t count);
void close();
void setS3interface(S3Interface *s3);
void setS3interface(S3Interface *s3) {
this->s3interface = s3;
}
void setUpstreamReader(Reader *reader) {
this->upstreamReader = reader;
}
......@@ -56,7 +59,7 @@ class S3BucketReader : public Reader {
void SetRegion();
void SetBucketAndPrefix();
BucketContent *getNextKey();
const ReaderParams &getReaderParams(BucketContent *key);
ReaderParams getReaderParams(BucketContent *key);
};
#endif
\ No newline at end of file
#endif
......@@ -2,7 +2,38 @@
#define INCLUDE_S3INTERFACE_H_
#include "restful_service.h"
#include "s3reader.h"
struct BucketContent {
BucketContent() : name(""), size(0) {
}
BucketContent(string name, uint64_t size) {
this->name = name;
this->size = size;
}
~BucketContent() {
}
string getName() const {
return this->name;
};
uint64_t getSize() const {
return this->size;
};
string name;
uint64_t size;
};
// To avoid double delete and core dump, always use new to create
// ListBucketResult object,
// unless we upgrade to c++11. Reason is we delete ListBucketResult in close()
// explicitly.
struct ListBucketResult {
string Name;
string Prefix;
vector<BucketContent*> contents;
~ListBucketResult();
};
class S3Interface {
public:
......@@ -10,18 +41,28 @@ class S3Interface {
}
// It is caller's responsibility to free returned memory.
virtual ListBucketResult* ListBucket(const string& schema, const string& region,
virtual ListBucketResult* listBucket(const string& schema, const string& region,
const string& bucket, const string& prefix,
const S3Credential& cred) = 0;
const S3Credential& cred) {
throw std::runtime_error("Default implementation must not be called.");
}
virtual uint64_t fetchData(uint64_t offset, char* data, uint64_t len, const string& sourceUrl,
const string& region, const S3Credential& cred) {
throw std::runtime_error("Default implementation must not be called.");
}
};
class S3Service : public S3Interface {
public:
S3Service();
virtual ~S3Service();
ListBucketResult* ListBucket(const string& schema, const string& region, const string& bucket,
ListBucketResult* listBucket(const string& schema, const string& region, const string& bucket,
const string& prefix, const S3Credential& cred);
uint64_t fetchData(uint64_t offset, char* data, uint64_t len, const string& sourceUrl,
const string& region, const S3Credential& cred);
void setRESTfulService(RESTfulService* service) {
this->service = service;
}
......@@ -29,11 +70,15 @@ class S3Service : public S3Interface {
private:
string getUrl(const string& prefix, const string& schema, const string& host,
const string& bucket, const string& marker);
void parseBucketXML(ListBucketResult* result, xmlNode* root_element, string& marker);
xmlParserCtxtPtr getBucketXML(const string& region, const string& url, const string& prefix,
const S3Credential& cred, const string& marker);
bool checkAndParseBucketXML(ListBucketResult* result, xmlParserCtxtPtr xmlcontext,
string& marker);
HTTPHeaders composeHTTPHeaders(const string& url, const string& marker, const string& prefix,
const string& region, const S3Credential& cred);
......
#ifndef INCLUDE_S3KEYREADER_H_
#define INCLUDE_S3KEYREADER_H_
#ifndef INCLUDE_S3KEY_READER_H_
#define INCLUDE_S3KEY_READER_H_
#include <string>
using std::string;
#include "gps3ext.h"
#include "reader.h"
#include "s3common.h"
#include "s3interface.h"
#include "s3macros.h"
#include "s3restful_service.h"
#include "s3url_parser.h"
#include <vector>
using std::vector;
using std::stringstream;
struct Range {
uint64_t offset;
uint64_t length;
};
class OffsetMgr {
public:
OffsetMgr() : keySize(0), chunkSize(0), curPos(0) {
pthread_mutex_init(&this->offsetLock, NULL);
}
~OffsetMgr() {
pthread_mutex_destroy(&this->offsetLock);
}
Range getNextOffset(); // ret.length == 0 means EOF
uint64_t getChunkSize() const {
return chunkSize;
}
void setChunkSize(uint64_t chunkSize) {
this->chunkSize = chunkSize;
}
uint64_t getKeySize() const {
return keySize;
}
void setKeySize(uint64_t keySize) {
this->keySize = keySize;
}
private:
pthread_mutex_t offsetLock;
uint64_t keySize;
uint64_t chunkSize;
uint64_t curPos;
};
enum ChunkStatus {
ReadyToRead,
ReadyToFill,
};
class ChunkBuffer {
public:
ChunkBuffer(const string& url, OffsetMgr& mgr, bool& sharedError, const S3Credential& cred,
const string& region);
/*ChunkBuffer(const ChunkBuffer& other)
: sourceUrl(other.sourceUrl),
sharedError(other.sharedError),
offsetMgr(other.offsetMgr),
credential(other.credential),
region(other.region),
s3interface(other.s3interface) {
this->chunkData = other.chunkData;
curFileOffset = other.curFileOffset;
chunkDataSize = other.chunkDataSize;
status = other.status;
eof = other.eof;
curChunkOffset = other.curChunkOffset;
}*/
~ChunkBuffer();
bool isEOF() {
return this->eof;
}
// Error is shared among all ChunkBuffers of a KeyReader.
bool isError() {
return this->sharedError;
}
void setError() {
this->sharedError = true;
}
uint64_t read(char* buf, uint64_t len);
uint64_t fill();
void setS3interface(S3Interface* s3) {
this->s3interface = s3;
}
void init();
void destroy();
protected:
string sourceUrl;
private:
bool eof;
bool& sharedError;
ChunkStatus status;
pthread_mutex_t stat_mutex;
pthread_cond_t stat_cond;
uint64_t curFileOffset;
uint64_t curChunkOffset;
uint64_t chunkDataSize;
char* chunkData;
OffsetMgr& offsetMgr;
const S3Credential& credential;
const string& region;
S3Interface* s3interface;
};
class S3KeyReader : public Reader {
public:
S3KeyReader();
virtual ~S3KeyReader();
S3KeyReader()
: sharedError(false),
numOfChunks(0),
curReadingChunk(0),
transferredKeyLen(0),
s3interface(NULL) {
}
virtual ~S3KeyReader() {
}
void open(const ReaderParams& params);
uint64_t read(char* buf, uint64_t count);
void close();
void setS3interface(S3Interface* s3) {
this->s3interface = s3;
}
private:
bool sharedError;
uint64_t numOfChunks;
uint64_t curReadingChunk;
uint64_t transferredKeyLen;
string region;
OffsetMgr offsetMgr;
vector<ChunkBuffer> chunkBuffers;
vector<pthread_t> threads;
S3Interface* s3interface;
};
#endif /* INCLUDE_S3KEYREADER_H_ */
#endif /* INCLUDE_S3KEYREADER_H_ */
\ No newline at end of file
#ifndef INCLUDE_S3RESTUL_SERVICE_H_
#define INCLUDE_S3RESTUL_SERVICE_H_
#ifndef INCLUDE_S3RESTFUL_SERVICE_H_
#define INCLUDE_S3RESTFUL_SERVICE_H_
#include "restful_service.h"
......@@ -10,4 +10,4 @@ class S3RESTfulService : public RESTfulService {
Response get(const string& url, HTTPHeaders& headers, const map<string, string>& params);
};
#endif /* INCLUDE_S3RESTUL_SERVICE_H_ */
#endif /* INCLUDE_S3RESTFUL_SERVICE_H_ */
\ No newline at end of file
......@@ -82,8 +82,8 @@ Datum s3_import(PG_FUNCTION_ARGS) {
s3reader = reader_init(url_with_options);
if (!s3reader) {
ereport(ERROR, (0, errmsg("Failed to init S3 extension, segid = "
"%d, segnum = %d, please check your "
ereport(ERROR, (0, errmsg("Failed to init S3 extension, segid = %d, "
"segnum = %d, please check your "
"configurations and net connection",
s3ext_segid, s3ext_segnum)));
}
......
#include "restful_service.h"
RESTfulService::RESTfulService() {
// TODO Auto-generated constructor stub
}
RESTfulService::~RESTfulService() {
// TODO Auto-generated destructor stub
}
\ No newline at end of file
......@@ -4,11 +4,11 @@
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include "gpreader.h"
#include "gps3ext.h"
#include "reader.h"
#include "reader_params.h"
#include "s3bucket_reader.h"
#include "s3common.h"
#include "s3conf.h"
#include "s3log.h"
#include "s3macros.h"
......@@ -35,12 +35,8 @@ S3BucketReader::~S3BucketReader() {
this->close();
}
void S3BucketReader::setS3interface(S3Interface *s3) {
this->s3interface = s3;
}
void S3BucketReader::open(const ReaderParams &params) {
this->url = params.getUrl();
this->url = params.getUrlToLoad();
this->segId = params.getSegId();
this->segNum = params.getSegNum();
this->cred = params.getCred();
......@@ -61,14 +57,14 @@ BucketContent *S3BucketReader::getNextKey() {
return this->keyList->contents[this->keyIndex];
}
const ReaderParams &S3BucketReader::getReaderParams(BucketContent *key) {
ReaderParams *params = new ReaderParams();
params->setKeyUrl(this->getKeyURL(key->getName()));
params->setRegion(this->region);
params->setSize(key->getSize());
params->setChunkSize(this->chunkSize);
S3DEBUG("key: %s, size: %" PRIu64, params->getKeyUrl().c_str(), params->getSize());
return *params;
ReaderParams S3BucketReader::getReaderParams(BucketContent *key) {
ReaderParams params = ReaderParams();
params.setKeyUrl(this->getKeyURL(key->getName()));
params.setRegion(this->region);
params.setKeySize(key->getSize());
params.setChunkSize(this->chunkSize);
S3DEBUG("key: %s, size: %" PRIu64, params.getKeyUrl().c_str(), params.getKeySize());
return params;
}
uint64_t S3BucketReader::read(char *buf, uint64_t count) {
......@@ -124,7 +120,7 @@ ListBucketResult *S3BucketReader::listBucketWithRetry(int retries) {
CHECK_OR_DIE(this->s3interface != NULL);
while (retries--) {
ListBucketResult *result = this->s3interface->ListBucket(
ListBucketResult *result = this->s3interface->listBucket(
this->schema, this->region, this->bucket, this->prefix, this->cred);
if (result != NULL) {
return result;
......@@ -203,4 +199,4 @@ void S3BucketReader::validateURL() {
bool ok = !(this->schema.empty() || this->region.empty() || this->bucket.empty());
CHECK_OR_DIE_MSG(ok, "'%s' is not valid", this->url.c_str());
}
\ No newline at end of file
}
......@@ -11,9 +11,9 @@
#include "gps3ext.h"
#include "s3http_headers.h"
#include "s3key_reader.h"
#include "s3log.h"
#include "s3macros.h"
#include "s3reader.h"
#include "s3url_parser.h"
#include "s3utils.h"
......@@ -238,7 +238,7 @@ void S3Service::parseBucketXML(ListBucketResult *result, xmlNode *root_element,
// service unstable, so that caller could retry.
//
// Caller should delete returned object.
ListBucketResult *S3Service::ListBucket(const string &schema, const string &region,
ListBucketResult *S3Service::listBucket(const string &schema, const string &region,
const string &bucket, const string &prefix,
const S3Credential &cred) {
stringstream host;
......@@ -269,4 +269,43 @@ ListBucketResult *S3Service::ListBucket(const string &schema, const string &regi
} while (!marker.empty());
return result;
}
uint64_t S3Service::fetchData(uint64_t offset, char *data, uint64_t len, const string &sourceUrl,
const string &region, const S3Credential &cred) {
CHECK_OR_DIE(data != NULL);
HTTPHeaders headers;
map<string, string> params;
UrlParser parser(sourceUrl.c_str());
char rangeBuf[128] = {0};
snprintf(rangeBuf, 128, "bytes=%" PRIu64 "-%" PRIu64, offset, offset + len - 1);
headers.Add(HOST, parser.Host());
headers.Add(RANGE, rangeBuf);
headers.Add(X_AMZ_CONTENT_SHA256, "UNSIGNED-PAYLOAD");
SignRequestV4("GET", &headers, region, parser.Path(), "", cred);
Response resp = service->get(sourceUrl, headers, params);
if (resp.getStatus() == OK) {
vector<uint8_t> &responseData = resp.getRawData();
CHECK_OR_DIE_MSG(responseData.size() == len, "%s", "Response is not fully received.");
std::copy(responseData.begin(), responseData.end(), data);
return responseData.size();
} else {
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
resp.getMessage().c_str());
}
return 0;
}
ListBucketResult::~ListBucketResult() {
vector<BucketContent *>::iterator i;
for (i = this->contents.begin(); i != this->contents.end(); i++) {
delete *i;
}
}
\ No newline at end of file
#include "s3keyreader.h"
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
S3KeyReader::S3KeyReader() {
// TODO Auto-generated constructor stub
#include "s3common.h"
#include "s3interface.h"
#include "s3key_reader.h"
// Return (offset, length) of next chunk to download,
// or (fileSize, 0) if reach end of file.
Range OffsetMgr::getNextOffset() {
Range ret;
pthread_mutex_lock(&this->offsetLock);
if (this->curPos < this->keySize) {
ret.offset = this->curPos;
} else {
ret.offset = this->keySize;
}
if (this->curPos + this->chunkSize > this->keySize) {
ret.length = this->keySize - this->curPos;
this->curPos = this->keySize;
} else {
ret.length = this->chunkSize;
this->curPos += this->chunkSize;
}
pthread_mutex_unlock(&this->offsetLock);
return ret;
}
ChunkBuffer::ChunkBuffer(const string& url, OffsetMgr& mgr, bool& sharedError,
const S3Credential& cred, const string& region)
: sourceUrl(url),
sharedError(sharedError),
chunkData(NULL),
offsetMgr(mgr),
credential(cred),
region(region),
s3interface(NULL) {
Range range = mgr.getNextOffset();
curFileOffset = range.offset;
chunkDataSize = range.length;
status = ReadyToFill;
eof = false;
curChunkOffset = 0;
}
ChunkBuffer::~ChunkBuffer() {
this->destroy();
}
// Copy constructor will copy members, but chunkData must not be initialized before copy.
// otherwise when worked with vector it will be freed twice.
void ChunkBuffer::init() {
if (chunkData != NULL) {
S3ERROR("Error: reinitializing chunkBuffer.");
return;
}
chunkData = new char[offsetMgr.getChunkSize()];
CHECK_OR_DIE_MSG(chunkData != NULL, "%s", "Failed to allocate Buffer, no enough memory?");
pthread_mutex_init(&this->stat_mutex, NULL);
pthread_cond_init(&this->stat_cond, NULL);
}
void ChunkBuffer::destroy() {
if (this->chunkData != NULL) {
delete this->chunkData;
this->chunkData = NULL;
pthread_mutex_destroy(&this->stat_mutex);
pthread_cond_destroy(&this->stat_cond);
}
}
// ret < len means EMPTY
// that's why it checks if leftLen is larger than *or equal to* len below[1], provides a chance ret
// is 0, which is smaller than len. Otherwise, other functions won't know when to read next buffer.
uint64_t ChunkBuffer::read(char* buf, uint64_t len) {
// QueryCancelPending stops s3_import(), this check is not needed if
// s3_import() every time calls ChunkBuffer->Read() only once, otherwise(as we did in
// downstreamReader->read() for decompression feature before), first call sets buffer to
// ReadyToFill, second call hangs.
CHECK_OR_DIE_MSG(!QueryCancelPending, "%s", "ChunkBuffer reading is interrupted by GPDB");
pthread_mutex_lock(&this->stat_mutex);
while (this->status != ReadyToRead) {
pthread_cond_wait(&this->stat_cond, &this->stat_mutex);
}
// Error is shared between all chunks.
if (this->isError()) {
pthread_mutex_unlock(&this->stat_mutex);
CHECK_OR_DIE_MSG(false, "%s", "ChunkBuffers encounter a downloading error.");
}
uint64_t leftLen = this->chunkDataSize - this->curChunkOffset;
uint64_t lenToRead = std::min(len, leftLen);
if (lenToRead != 0) {
memcpy(buf, this->chunkData + this->curChunkOffset, lenToRead);
}
if (len <= leftLen) { // [1]
this->curChunkOffset += lenToRead; // not empty
} else { // empty, reset everything
this->curChunkOffset = 0;
if (!this->isEOF()) {
this->status = ReadyToFill;
Range range = this->offsetMgr.getNextOffset();
this->curFileOffset = range.offset;
this->chunkDataSize = range.length;
pthread_cond_signal(&this->stat_cond);
}
}
pthread_mutex_unlock(&this->stat_mutex);
return lenToRead;
}
// returning -1 means error
uint64_t ChunkBuffer::fill() {
pthread_mutex_lock(&this->stat_mutex);
while (this->status != ReadyToFill) {
pthread_cond_wait(&this->stat_cond, &this->stat_mutex);
}
uint64_t offset = this->curFileOffset;
uint64_t leftLen = this->chunkDataSize;
uint64_t readLen = 0;
if (leftLen != 0) {
readLen = this->s3interface->fetchData(offset, this->chunkData, leftLen, this->sourceUrl,
this->region, this->credential);
if (readLen != leftLen) {
S3DEBUG("Failed to fetch expected data from S3");
this->sharedError = true;
} else {
S3DEBUG("Got %" PRIu64 " bytes from S3", readLen);
}
} else {
readLen = 0; // Nothing to read, EOF
S3DEBUG("Reached the end of file");
this->eof = true;
}
this->status = ReadyToRead;
pthread_cond_signal(&this->stat_cond);
pthread_mutex_unlock(&this->stat_mutex);
return (this->sharedError) ? -1 : readLen;
}
S3KeyReader::~S3KeyReader() {
// TODO Auto-generated destructor stub
}
\ No newline at end of file
void* DownloadThreadFunc(void* data) {
ChunkBuffer* buffer = static_cast<ChunkBuffer*>(data);
uint64_t filledSize = 0;
S3DEBUG("Downloading thread starts");
do {
if (QueryCancelPending) {
S3INFO("Downloading thread is interrupted by GPDB");
// error is shared between all chunks, so all chunks will stop.
buffer->setError();
return NULL;
}
filledSize = buffer->fill();
if (filledSize != 0) {
if (buffer->isError()) {
S3DEBUG("Failed to fill downloading buffer");
break;
} else {
S3DEBUG("Size of filled data is %" PRIu64, filledSize);
}
}
} while (!buffer->isEOF());
S3DEBUG("Downloading thread ended");
return NULL;
}
void S3KeyReader::open(const ReaderParams& params) {
this->numOfChunks = params.getNumOfChunks();
CHECK_OR_DIE_MSG(this->numOfChunks > 0, "%s", "numOfChunks must not be zero");
this->region = params.getRegion();
this->offsetMgr.setKeySize(params.getKeySize());
this->offsetMgr.setChunkSize(params.getChunkSize());
for (uint64_t i = 0; i < this->numOfChunks; i++) {
// when vector reallocate memory, it will copy object.
// chunkData must be initialized after all copy.
this->chunkBuffers.push_back(ChunkBuffer(params.getKeyUrl(), this->offsetMgr,
this->sharedError, params.getCred(),
this->region));
}
for (uint64_t i = 0; i < this->numOfChunks; i++) {
this->chunkBuffers[i].init();
this->chunkBuffers[i].setS3interface(this->s3interface);
pthread_t thread;
pthread_create(&thread, NULL, DownloadThreadFunc, &this->chunkBuffers[i]);
this->threads.push_back(thread);
}
return;
}
uint64_t S3KeyReader::read(char* buf, uint64_t count) {
uint64_t fileLen = this->offsetMgr.getKeySize();
uint64_t readLen = 0;
do {
// confirm there is no more available data, done with this file
if (this->transferredKeyLen >= fileLen) {
return 0;
}
ChunkBuffer& buffer = chunkBuffers[this->curReadingChunk % this->numOfChunks];
readLen = buffer.read(buf, count);
this->transferredKeyLen += readLen;
if (readLen < count) {
this->curReadingChunk++;
CHECK_OR_DIE_MSG(!buffer.isError(), "%s", "Error occurs while downloading, skip");
}
// retry to confirm whether thread reading is finished or chunk size is
// divisible by get()'s buffer size
} while (readLen == 0);
return readLen;
}
void S3KeyReader::close() {
for (uint64_t i = 0; i < this->threads.size(); i++) {
pthread_cancel(this->threads[i]);
pthread_join(this->threads[i], NULL);
}
threads.clear();
return;
}
此差异已折叠。
......@@ -9,7 +9,7 @@
#include "s3http_headers.h"
#include "s3log.h"
#include "s3macros.h"
#include "s3restul_service.h"
#include "s3restful_service.h"
using namespace std;
......@@ -43,10 +43,15 @@ Response S3RESTfulService::get(const string &url, HTTPHeaders &headers,
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_FORBID_REUSE, 1L);
curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers.GetList());
curl_easy_setopt(curl, CURLOPT_WRITEDATA, (void *)&response);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, RESTfulServiceCallback);
// consider low speed as timeout
curl_easy_setopt(curl, CURLOPT_LOW_SPEED_LIMIT, s3ext_low_speed_limit);
curl_easy_setopt(curl, CURLOPT_LOW_SPEED_TIME, s3ext_low_speed_time);
map<string, string>::const_iterator iter = params.find("debug");
if (iter != params.end() && iter->second == "true") {
curl_easy_setopt(curl, CURLOPT_VERBOSE, 1L);
......@@ -63,7 +68,7 @@ Response S3RESTfulService::get(const string &url, HTTPHeaders &headers,
response.clearBuffer();
response.setStatus(FAIL);
response.setMessage(
string("failed to talk to s3 service ").append(curl_easy_strerror(res)));
string("Failed to talk to s3 service ").append(curl_easy_strerror(res)));
} else {
response.setStatus(OK);
response.setMessage("Success");
......
......@@ -10,7 +10,7 @@
#include "s3common.h"
#include "s3http_headers.h"
#include "s3reader.h"
#include "s3key_reader.h"
#include "s3uploader.h"
#include "s3url_parser.h"
#include "s3utils.h"
......
#ifndef TEST_MOCK_CLASSES_H_
#define TEST_MOCK_CLASSES_H_
#include "gmock/gmock.h"
#include "s3common.h"
#include "s3interface.h"
#include <string>
using std::string;
class MockS3Interface : public S3Interface {
public:
MOCK_METHOD5(listBucket,
ListBucketResult*(const string& schema, const string& region, const string& bucket,
const string& prefix, const S3Credential& cred));
MOCK_METHOD6(fetchData,
uint64_t(uint64_t offset, char *data, uint64_t len, const string &sourceUrl,
const string &region, const S3Credential &cred));
};
#endif /* TEST_MOCK_CLASSES_H_ */
\ No newline at end of file
#include "s3bucket_reader.cpp"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "mock_classes.h"
#include "reader_params.h"
#include "s3bucket_reader.cpp"
using ::testing::AtLeast;
using ::testing::Return;
using ::testing::Throw;
using ::testing::_;
// ================== MOCK Object ===================
class MockS3Interface : public S3Interface {
public:
MOCK_METHOD5(ListBucket,
ListBucketResult*(const string& schema, const string& region, const string& bucket,
const string& prefix, const S3Credential& cred));
};
class MockS3Reader : public Reader {
public:
MOCK_METHOD1(open, void(const ReaderParams& params));
......@@ -49,17 +43,17 @@ class S3BucketReaderTest : public testing::Test {
TEST_F(S3BucketReaderTest, OpenInvalidURL) {
string url = "https://s3-us-east-2.amazon.com/s3test.pivotal.io/whatever";
params.setUrl(url);
params.setUrlToLoad(url);
EXPECT_THROW(bucketReader->open(params), std::runtime_error);
}
TEST_F(S3BucketReaderTest, OpenURL) {
ListBucketResult* result = new ListBucketResult();
EXPECT_CALL(s3interface, ListBucket(_, _, _, _, _)).Times(1).WillOnce(Return(result));
EXPECT_CALL(s3interface, listBucket(_, _, _, _, _)).Times(1).WillOnce(Return(result));
string url = "https://s3-us-east-2.amazonaws.com/s3test.pivotal.io/whatever";
params.setUrl(url);
params.setUrlToLoad(url);
EXPECT_NO_THROW(bucketReader->open(params));
}
......@@ -76,7 +70,7 @@ TEST_F(S3BucketReaderTest, ListBucketWithRetryThrowExceptionWhenS3InterfaceIsNUL
TEST_F(S3BucketReaderTest, ListBucketWithRetry) {
ListBucketResult* result = new ListBucketResult();
EXPECT_CALL(s3interface, ListBucket(_, _, _, _, _)).Times(1).WillOnce(Return(result));
EXPECT_CALL(s3interface, listBucket(_, _, _, _, _)).Times(1).WillOnce(Return(result));
EXPECT_NE((void*)NULL, bucketReader->listBucketWithRetry(1));
}
......@@ -84,7 +78,7 @@ TEST_F(S3BucketReaderTest, ListBucketWithRetry) {
TEST_F(S3BucketReaderTest, ListBucketWithRetries) {
ListBucketResult* result = new ListBucketResult();
EXPECT_CALL(s3interface, ListBucket(_, _, _, _, _))
EXPECT_CALL(s3interface, listBucket(_, _, _, _, _))
.Times(3)
.WillOnce(Return((ListBucketResult*)NULL))
.WillOnce(Return((ListBucketResult*)NULL))
......@@ -99,9 +93,9 @@ TEST_F(S3BucketReaderTest, ReaderThrowExceptionWhenUpstreamReaderIsNULL) {
TEST_F(S3BucketReaderTest, ReaderReturnZeroForEmptyBucket) {
ListBucketResult* result = new ListBucketResult();
EXPECT_CALL(s3interface, ListBucket(_, _, _, _, _)).Times(1).WillOnce(Return(result));
EXPECT_CALL(s3interface, listBucket(_, _, _, _, _)).Times(1).WillOnce(Return(result));
params.setUrl("https://s3-us-east-2.amazonaws.com/s3test.pivotal.io/whatever");
params.setUrlToLoad("https://s3-us-east-2.amazonaws.com/s3test.pivotal.io/whatever");
bucketReader->open(params);
bucketReader->setUpstreamReader(&s3reader);
EXPECT_EQ(0, bucketReader->read(buf, sizeof(buf)));
......@@ -112,7 +106,7 @@ TEST_F(S3BucketReaderTest, ReadBucketWithSingleFile) {
BucketContent* item = new BucketContent("foo", 456);
result->contents.push_back(item);
EXPECT_CALL(s3interface, ListBucket(_, _, _, _, _)).Times(1).WillOnce(Return(result));
EXPECT_CALL(s3interface, listBucket(_, _, _, _, _)).Times(1).WillOnce(Return(result));
EXPECT_CALL(s3reader, read(_, _))
.Times(3)
......@@ -125,7 +119,7 @@ TEST_F(S3BucketReaderTest, ReadBucketWithSingleFile) {
params.setSegId(0);
params.setSegNum(1);
params.setUrl("https://s3-us-east-2.amazonaws.com/s3test.pivotal.io/whatever");
params.setUrlToLoad("https://s3-us-east-2.amazonaws.com/s3test.pivotal.io/whatever");
bucketReader->open(params);
bucketReader->setUpstreamReader(&s3reader);
......@@ -141,7 +135,7 @@ TEST_F(S3BucketReaderTest, ReadBuckeWithOneEmptyFileOneNonEmptyFile) {
item = new BucketContent("bar", 456);
result->contents.push_back(item);
EXPECT_CALL(s3interface, ListBucket(_, _, _, _, _)).Times(1).WillOnce(Return(result));
EXPECT_CALL(s3interface, listBucket(_, _, _, _, _)).Times(1).WillOnce(Return(result));
EXPECT_CALL(s3reader, read(_, _))
.Times(3)
......@@ -154,7 +148,7 @@ TEST_F(S3BucketReaderTest, ReadBuckeWithOneEmptyFileOneNonEmptyFile) {
params.setSegId(0);
params.setSegNum(1);
params.setUrl("https://s3-us-east-2.amazonaws.com/s3test.pivotal.io/whatever");
params.setUrlToLoad("https://s3-us-east-2.amazonaws.com/s3test.pivotal.io/whatever");
bucketReader->open(params);
bucketReader->setUpstreamReader(&s3reader);
......@@ -167,11 +161,11 @@ TEST_F(S3BucketReaderTest, ReaderShouldSkipIfFileIsNotForThisSegment) {
BucketContent* item = new BucketContent("foo", 456);
result->contents.push_back(item);
EXPECT_CALL(s3interface, ListBucket(_, _, _, _, _)).Times(1).WillOnce(Return(result));
EXPECT_CALL(s3interface, listBucket(_, _, _, _, _)).Times(1).WillOnce(Return(result));
params.setSegId(10);
params.setSegNum(16);
params.setUrl("https://s3-us-east-2.amazonaws.com/s3test.pivotal.io/whatever");
params.setUrlToLoad("https://s3-us-east-2.amazonaws.com/s3test.pivotal.io/whatever");
bucketReader->open(params);
bucketReader->setUpstreamReader(&s3reader);
......@@ -185,7 +179,7 @@ TEST_F(S3BucketReaderTest, UpstreamReaderThrowException) {
item = new BucketContent("bar", 456);
result->contents.push_back(item);
EXPECT_CALL(s3interface, ListBucket(_, _, _, _, _)).Times(1).WillOnce(Return(result));
EXPECT_CALL(s3interface, listBucket(_, _, _, _, _)).Times(1).WillOnce(Return(result));
EXPECT_CALL(s3reader, read(_, _))
.Times(AtLeast(1))
......@@ -196,10 +190,10 @@ TEST_F(S3BucketReaderTest, UpstreamReaderThrowException) {
params.setSegId(0);
params.setSegNum(1);
params.setUrl("https://s3-us-east-2.amazonaws.com/s3test.pivotal.io/whatever");
params.setUrlToLoad("https://s3-us-east-2.amazonaws.com/s3test.pivotal.io/whatever");
bucketReader->open(params);
bucketReader->setUpstreamReader(&s3reader);
EXPECT_THROW(bucketReader->read(buf, sizeof(buf)), std::runtime_error);
EXPECT_THROW(bucketReader->read(buf, sizeof(buf)), std::runtime_error);
}
\ No newline at end of file
}
#include "s3interface.cpp"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "restful_service.cpp"
using ::testing::AtLeast;
using ::testing::Return;
......@@ -120,13 +119,13 @@ class S3ServiceTest : public testing::Test {
};
TEST_F(S3ServiceTest, ListBucketThrowExceptionWhenBucketStringIsEmpty) {
EXPECT_THROW(s3service->ListBucket("", "", "", "", cred), std::runtime_error);
EXPECT_THROW(s3service->listBucket("", "", "", "", cred), std::runtime_error);
}
TEST_F(S3ServiceTest, ListBucketWithWrongRegion) {
EXPECT_CALL(mockRestfulService, get(_, _, _)).WillOnce(Return(response));
EXPECT_EQ((void *)NULL, s3service->ListBucket(schema, "nonexist", "", "", cred));
EXPECT_EQ((void *)NULL, s3service->listBucket(schema, "nonexist", "", "", cred));
}
TEST_F(S3ServiceTest, ListBucketWithWrongBucketName) {
......@@ -147,7 +146,7 @@ TEST_F(S3ServiceTest, ListBucketWithWrongBucketName) {
EXPECT_CALL(mockRestfulService, get(_, _, _)).WillOnce(Return(response));
EXPECT_EQ((void *)NULL, s3service->ListBucket(schema, "us-west-2", "foo/bar", "", cred));
EXPECT_EQ((void *)NULL, s3service->listBucket(schema, "us-west-2", "foo/bar", "", cred));
}
TEST_F(S3ServiceTest, ListBucketWithNormalBucket) {
......@@ -164,7 +163,7 @@ TEST_F(S3ServiceTest, ListBucketWithNormalBucket) {
EXPECT_CALL(mockRestfulService, get(_, _, _)).WillOnce(Return(response));
ListBucketResult *result =
s3service->ListBucket(schema, "us-west-2", "s3test.pivotal.io", "threebytes/", cred);
s3service->listBucket(schema, "us-west-2", "s3test.pivotal.io", "threebytes/", cred);
ASSERT_NE((void *)NULL, result);
EXPECT_EQ(1, result->contents.size());
}
......@@ -174,7 +173,7 @@ TEST_F(S3ServiceTest, ListBucketWithBucketWith1000Keys) {
.WillOnce(Return(this->buildListBucketResponse(1000, false)));
ListBucketResult *result =
s3service->ListBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred);
s3service->listBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred);
ASSERT_NE((void *)NULL, result);
EXPECT_EQ(1000, result->contents.size());
}
......@@ -185,7 +184,7 @@ TEST_F(S3ServiceTest, ListBucketWithBucketWith1001Keys) {
.WillOnce(Return(this->buildListBucketResponse(1, false)));
ListBucketResult *result =
s3service->ListBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred);
s3service->listBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred);
ASSERT_NE((void *)NULL, result);
EXPECT_EQ(1001, result->contents.size());
}
......@@ -200,7 +199,7 @@ TEST_F(S3ServiceTest, ListBucketWithBucketWithMoreThan1000Keys) {
.WillOnce(Return(this->buildListBucketResponse(120, false)));
ListBucketResult *result =
s3service->ListBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred);
s3service->listBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred);
ASSERT_NE((void *)NULL, result);
EXPECT_EQ(5120, result->contents.size());
}
......@@ -214,7 +213,7 @@ TEST_F(S3ServiceTest, ListBucketWithBucketWithTruncatedResponse) {
.WillOnce(Return(EmptyResponse));
ListBucketResult *result =
s3service->ListBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred);
s3service->listBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred);
EXPECT_EQ((void *)NULL, result);
}
......@@ -225,7 +224,7 @@ TEST_F(S3ServiceTest, ListBucketWithBucketWithZeroSizedKeys) {
.WillOnce(Return(this->buildListBucketResponse(120, false, 8)));
ListBucketResult *result =
s3service->ListBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred);
s3service->listBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred);
ASSERT_NE((void *)NULL, result);
EXPECT_EQ(1120, result->contents.size());
}
......@@ -235,7 +234,7 @@ TEST_F(S3ServiceTest, ListBucketWithEmptyBucket) {
.WillOnce(Return(this->buildListBucketResponse(0, false, 0)));
ListBucketResult *result =
s3service->ListBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred);
s3service->listBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred);
ASSERT_NE((void *)NULL, result);
EXPECT_EQ(0, result->contents.size());
}
......@@ -245,7 +244,7 @@ TEST_F(S3ServiceTest, ListBucketWithAllZeroedFilesBucket) {
.WillOnce(Return(this->buildListBucketResponse(0, false, 2)));
ListBucketResult *result =
s3service->ListBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred);
s3service->listBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred);
ASSERT_NE((void *)NULL, result);
EXPECT_EQ(0, result->contents.size());
}
......@@ -254,7 +253,7 @@ TEST_F(S3ServiceTest, ListBucketWithErrorResponse) {
EXPECT_CALL(mockRestfulService, get(_, _, _)).WillOnce(Return(response));
EXPECT_EQ((void *)NULL,
s3service->ListBucket(schema, "nonexist", "s3test.pivotal.io", "s3files/", cred));
s3service->listBucket(schema, "nonexist", "s3test.pivotal.io", "s3files/", cred));
}
TEST_F(S3ServiceTest, ListBucketWithErrorReturnedXML) {
......@@ -265,7 +264,7 @@ TEST_F(S3ServiceTest, ListBucketWithErrorReturnedXML) {
EXPECT_CALL(mockRestfulService, get(_, _, _)).WillOnce(Return(response));
EXPECT_EQ((void *)NULL,
s3service->ListBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred));
s3service->listBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred));
}
TEST_F(S3ServiceTest, ListBucketWithNonRootXML) {
......@@ -276,5 +275,58 @@ TEST_F(S3ServiceTest, ListBucketWithNonRootXML) {
EXPECT_CALL(mockRestfulService, get(_, _, _)).WillOnce(Return(response));
EXPECT_EQ((void *)NULL,
s3service->ListBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred));
}
\ No newline at end of file
s3service->listBucket(schema, "us-west-2", "s3test.pivotal.io", "s3files/", cred));
}
TEST_F(S3ServiceTest, fetchDataRoutine) {
vector<uint8_t> raw;
srand(time(NULL));
for (int i = 0; i < 100; i++) {
raw.push_back(rand() & 0xFF);
}
Response response(OK, raw);
EXPECT_CALL(mockRestfulService, get(_, _, _)).WillOnce(Return(response));
char buffer[100];
uint64_t len = s3service->fetchData(
0, buffer, 100, "https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever", region,
cred);
EXPECT_EQ(0, memcmp(buffer, raw.data(), 100));
EXPECT_EQ(100, len);
}
TEST_F(S3ServiceTest, fetchDataNULLBuffer) {
EXPECT_THROW(s3service->fetchData(
0, NULL, 100, "https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever",
region, cred),
std::runtime_error);
}
TEST_F(S3ServiceTest, fetchDataFailedResponse) {
vector<uint8_t> raw;
raw.resize(100);
Response response(FAIL, raw);
EXPECT_CALL(mockRestfulService, get(_, _, _)).WillOnce(Return(response));
char buffer[100];
EXPECT_THROW(s3service->fetchData(
0, buffer, 100,
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever", region, cred),
std::runtime_error);
}
TEST_F(S3ServiceTest, fetchDataPartialResponse) {
vector<uint8_t> raw;
raw.resize(80);
Response response(OK, raw);
EXPECT_CALL(mockRestfulService, get(_, _, _)).WillOnce(Return(response));
char buffer[100];
EXPECT_THROW(s3service->fetchData(
0, buffer, 100,
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever", region, cred),
std::runtime_error);
}
#include "s3keyreader.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "mock_classes.h"
#include "reader_params.h"
#include "s3key_reader.cpp"
using ::testing::AtLeast;
using ::testing::AtMost;
using ::testing::Return;
using ::testing::Throw;
using ::testing::_;
volatile bool QueryCancelPending = false;
// ================== S3KeyReaderTest ===================
class S3KeyReaderTest : public testing::Test {
protected:
// Remember that SetUp() is run immediately before a test starts.
virtual void SetUp() {
QueryCancelPending = false;
keyReader = new S3KeyReader();
keyReader->setS3interface(&s3interface);
}
// TearDown() is invoked immediately after a test finishes.
virtual void TearDown() {
keyReader->close();
delete keyReader;
}
S3KeyReader *keyReader;
ReaderParams params;
char buffer[256];
MockS3Interface s3interface;
};
TEST(OffsetMgr, simple) {
OffsetMgr o;
o.setKeySize(4096);
o.setChunkSize(1000);
EXPECT_EQ(1000, o.getChunkSize());
EXPECT_EQ(4096, o.getKeySize());
Range r = o.getNextOffset();
EXPECT_EQ(r.offset, 0);
EXPECT_EQ(r.length, 1000);
r = o.getNextOffset();
EXPECT_EQ(r.offset, 1000);
EXPECT_EQ(r.length, 1000);
r = o.getNextOffset();
EXPECT_EQ(r.offset, 2000);
EXPECT_EQ(r.length, 1000);
r = o.getNextOffset();
EXPECT_EQ(r.offset, 3000);
EXPECT_EQ(r.length, 1000);
r = o.getNextOffset();
EXPECT_EQ(r.offset, 4000);
EXPECT_EQ(r.length, 96);
EXPECT_EQ(1000, o.getChunkSize());
EXPECT_EQ(4096, o.getKeySize());
}
TEST(OffsetMgr, KeySizeSmallerThanChunkSize) {
OffsetMgr o;
o.setKeySize(127);
o.setChunkSize(1000);
EXPECT_EQ(1000, o.getChunkSize());
EXPECT_EQ(127, o.getKeySize());
Range r = o.getNextOffset();
EXPECT_EQ(r.offset, 0);
EXPECT_EQ(r.length, 127);
r = o.getNextOffset();
EXPECT_EQ(r.offset, 127);
EXPECT_EQ(r.length, 0);
r = o.getNextOffset();
EXPECT_EQ(r.offset, 127);
EXPECT_EQ(r.length, 0);
}
TEST_F(S3KeyReaderTest, OpenWithZeroChunk) {
params.setNumOfChunks(0);
EXPECT_THROW(keyReader->open(params), std::runtime_error);
}
TEST_F(S3KeyReaderTest, ReadWithSingleChunk) {
params.setNumOfChunks(1);
params.setRegion("us-west-2");
params.setKeySize(255);
params.setChunkSize(8192);
EXPECT_CALL(s3interface, fetchData(_, _, _, _, _, _)).WillOnce(Return(255));
keyReader->open(params);
EXPECT_EQ(255, keyReader->read(buffer, 64 * 1024));
EXPECT_EQ(0, keyReader->read(buffer, 64 * 1024));
}
TEST_F(S3KeyReaderTest, ReadWithSingleChunkNormalCase) {
// Read buffer < chunk size < key size
params.setNumOfChunks(1);
params.setRegion("us-west-2");
params.setKeySize(255);
params.setChunkSize(64);
EXPECT_CALL(s3interface, fetchData(_, _, _, _, _, _))
.WillOnce(Return(64))
.WillOnce(Return(64))
.WillOnce(Return(64))
.WillOnce(Return(63));
keyReader->open(params);
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(31, keyReader->read(buffer, 32));
EXPECT_EQ(0, keyReader->read(buffer, 32));
}
TEST_F(S3KeyReaderTest, ReadWithSmallBuffer) {
params.setNumOfChunks(1);
params.setRegion("us-west-2");
params.setKeySize(255);
params.setChunkSize(8192);
EXPECT_CALL(s3interface, fetchData(_, _, _, _, _, _)).WillOnce(Return(255));
keyReader->open(params);
EXPECT_EQ(64, keyReader->read(buffer, 64));
EXPECT_EQ(64, keyReader->read(buffer, 64));
EXPECT_EQ(64, keyReader->read(buffer, 64));
EXPECT_EQ(63, keyReader->read(buffer, 64));
EXPECT_EQ(0, keyReader->read(buffer, 64));
}
TEST_F(S3KeyReaderTest, ReadWithSmallKeySize) {
params.setNumOfChunks(1);
params.setRegion("us-west-2");
params.setKeySize(2);
params.setChunkSize(8192);
EXPECT_CALL(s3interface, fetchData(_, _, _, _, _, _)).WillOnce(Return(2));
keyReader->open(params);
EXPECT_EQ(2, keyReader->read(buffer, 64));
EXPECT_EQ(0, keyReader->read(buffer, 64));
}
TEST_F(S3KeyReaderTest, ReadWithSmallChunk) {
params.setNumOfChunks(1);
params.setRegion("us-west-2");
params.setKeySize(255);
params.setChunkSize(64);
EXPECT_CALL(s3interface, fetchData(_, _, _, _, _, _))
.WillOnce(Return(64))
.WillOnce(Return(64))
.WillOnce(Return(64))
.WillOnce(Return(63));
keyReader->open(params);
EXPECT_EQ(64, keyReader->read(buffer, 64));
EXPECT_EQ(64, keyReader->read(buffer, 64));
EXPECT_EQ(64, keyReader->read(buffer, 64));
EXPECT_EQ(63, keyReader->read(buffer, 64));
EXPECT_EQ(0, keyReader->read(buffer, 64));
}
TEST_F(S3KeyReaderTest, ReadWithSmallChunkDividedKeySize) {
params.setNumOfChunks(1);
params.setRegion("us-west-2");
params.setKeySize(256);
params.setChunkSize(64);
EXPECT_CALL(s3interface, fetchData(_, _, _, _, _, _))
.WillOnce(Return(64))
.WillOnce(Return(64))
.WillOnce(Return(64))
.WillOnce(Return(64));
keyReader->open(params);
EXPECT_EQ(64, keyReader->read(buffer, 64));
EXPECT_EQ(64, keyReader->read(buffer, 64));
EXPECT_EQ(64, keyReader->read(buffer, 64));
EXPECT_EQ(64, keyReader->read(buffer, 64));
EXPECT_EQ(0, keyReader->read(buffer, 64));
}
TEST_F(S3KeyReaderTest, ReadWithChunkLargerThanReadBufferAndKeySize) {
params.setNumOfChunks(1);
params.setRegion("us-west-2");
params.setKeySize(255);
params.setChunkSize(8192);
EXPECT_CALL(s3interface, fetchData(_, _, _, _, _, _)).WillOnce(Return(255));
keyReader->open(params);
EXPECT_EQ(255, keyReader->read(buffer, 255));
EXPECT_EQ(0, keyReader->read(buffer, 255));
}
TEST_F(S3KeyReaderTest, ReadWithKeyLargerThanChunkSize) {
params.setNumOfChunks(1);
params.setRegion("us-west-2");
params.setKeySize(1024);
params.setChunkSize(255);
EXPECT_CALL(s3interface, fetchData(_, _, _, _, _, _))
.WillOnce(Return(255))
.WillOnce(Return(255))
.WillOnce(Return(255))
.WillOnce(Return(255))
.WillOnce(Return(4));
keyReader->open(params);
EXPECT_EQ(255, keyReader->read(buffer, 255));
EXPECT_EQ(255, keyReader->read(buffer, 255));
EXPECT_EQ(255, keyReader->read(buffer, 255));
EXPECT_EQ(255, keyReader->read(buffer, 255));
EXPECT_EQ(4, keyReader->read(buffer, 255));
EXPECT_EQ(0, keyReader->read(buffer, 255));
}
TEST_F(S3KeyReaderTest, ReadWithSameKeyChunkReadSize) {
params.setNumOfChunks(1);
params.setRegion("us-west-2");
params.setKeySize(255);
params.setChunkSize(255);
EXPECT_CALL(s3interface, fetchData(_, _, _, _, _, _)).WillOnce(Return(255));
keyReader->open(params);
EXPECT_EQ(255, keyReader->read(buffer, 255));
EXPECT_EQ(0, keyReader->read(buffer, 255));
}
TEST_F(S3KeyReaderTest, MTReadWith2Chunks) {
params.setNumOfChunks(2);
params.setRegion("us-west-2");
params.setKeySize(255);
params.setChunkSize(64);
EXPECT_CALL(s3interface, fetchData(0, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(64, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(128, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(192, _, _, _, _, _)).WillOnce(Return(63));
keyReader->open(params);
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(31, keyReader->read(buffer, 32));
EXPECT_EQ(0, keyReader->read(buffer, 32));
}
TEST_F(S3KeyReaderTest, MTReadWithRedundantChunks) {
params.setNumOfChunks(8);
params.setRegion("us-west-2");
params.setKeySize(255);
params.setChunkSize(64);
EXPECT_CALL(s3interface, fetchData(0, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(64, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(128, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(192, _, _, _, _, _)).WillOnce(Return(63));
keyReader->open(params);
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(31, keyReader->read(buffer, 32));
EXPECT_EQ(0, keyReader->read(buffer, 32));
}
TEST_F(S3KeyReaderTest, MTReadWithReusedAndUnreusedChunks) {
params.setNumOfChunks(3);
params.setRegion("us-west-2");
params.setKeySize(255);
params.setChunkSize(64);
EXPECT_CALL(s3interface, fetchData(0, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(64, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(128, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(192, _, _, _, _, _)).WillOnce(Return(63));
keyReader->open(params);
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(32, keyReader->read(buffer, 32));
EXPECT_EQ(31, keyReader->read(buffer, 32));
EXPECT_EQ(0, keyReader->read(buffer, 32));
}
TEST_F(S3KeyReaderTest, MTReadWithChunksSmallerThanReadBuffer) {
params.setNumOfChunks(3);
params.setRegion("us-west-2");
params.setKeySize(255);
params.setChunkSize(64);
EXPECT_CALL(s3interface, fetchData(0, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(64, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(128, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(192, _, _, _, _, _)).WillOnce(Return(63));
keyReader->open(params);
EXPECT_EQ(64, keyReader->read(buffer, 127));
EXPECT_EQ(64, keyReader->read(buffer, 127));
EXPECT_EQ(64, keyReader->read(buffer, 127));
EXPECT_EQ(63, keyReader->read(buffer, 127));
EXPECT_EQ(0, keyReader->read(buffer, 127));
}
TEST_F(S3KeyReaderTest, MTReadWithFragmentalReadRequests) {
params.setNumOfChunks(5);
params.setRegion("us-west-2");
params.setKeySize(255);
params.setChunkSize(64);
EXPECT_CALL(s3interface, fetchData(0, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(64, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(128, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(192, _, _, _, _, _)).WillOnce(Return(63));
keyReader->open(params);
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(1, keyReader->read(buffer, 31));
EXPECT_EQ(0, keyReader->read(buffer, 31));
}
TEST_F(S3KeyReaderTest, MTReadWithFetchDataError) {
params.setNumOfChunks(3);
params.setRegion("us-west-2");
params.setKeySize(255);
params.setChunkSize(64);
EXPECT_CALL(s3interface, fetchData(0, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(64, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(128, _, _, _, _, _)).WillOnce(Return(-1));
EXPECT_CALL(s3interface, fetchData(192, _, _, _, _, _)).Times(AtMost(1)).WillOnce(Return(63));
keyReader->open(params);
try {
keyReader->read(buffer, 127);
} catch (...) {
}
try {
keyReader->read(buffer, 127);
} catch (...) {
}
EXPECT_THROW(keyReader->read(buffer, 127), std::runtime_error);
EXPECT_THROW(keyReader->read(buffer, 127), std::runtime_error);
}
TEST_F(S3KeyReaderTest, MTReadWithUnexpectedFetchData) {
params.setNumOfChunks(3);
params.setRegion("us-west-2");
params.setKeySize(255);
params.setChunkSize(64);
EXPECT_CALL(s3interface, fetchData(0, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(64, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(128, _, _, _, _, _)).WillOnce(Return(42));
EXPECT_CALL(s3interface, fetchData(192, _, _, _, _, _)).Times(AtMost(1)).WillOnce(Return(63));
keyReader->open(params);
try {
keyReader->read(buffer, 127);
} catch (...) {
}
try {
keyReader->read(buffer, 127);
} catch (...) {
}
EXPECT_THROW(keyReader->read(buffer, 127), std::runtime_error);
EXPECT_THROW(keyReader->read(buffer, 127), std::runtime_error);
}
TEST_F(S3KeyReaderTest, MTReadWithUnexpectedFetchDataAtSecondRound) {
params.setNumOfChunks(2);
params.setRegion("us-west-2");
params.setKeySize(255);
params.setChunkSize(64);
EXPECT_CALL(s3interface, fetchData(0, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(64, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(128, _, _, _, _, _)).WillOnce(Return(42));
EXPECT_CALL(s3interface, fetchData(192, _, _, _, _, _)).Times(AtMost(1)).WillOnce(Return(63));
keyReader->open(params);
try {
keyReader->read(buffer, 127);
} catch (...) {
}
try {
keyReader->read(buffer, 127);
} catch (...) {
}
EXPECT_THROW(keyReader->read(buffer, 127), std::runtime_error);
EXPECT_THROW(keyReader->read(buffer, 127), std::runtime_error);
}
TEST_F(S3KeyReaderTest, MTReadWithGPDBCancel) {
params.setNumOfChunks(3);
params.setRegion("us-west-2");
params.setKeySize(255);
params.setChunkSize(64);
EXPECT_CALL(s3interface, fetchData(0, _, _, _, _, _)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(64, _, _, _, _, _)).Times(AtMost(1)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(128, _, _, _, _, _)).Times(AtMost(1)).WillOnce(Return(64));
EXPECT_CALL(s3interface, fetchData(192, _, _, _, _, _)).Times(AtMost(1)).WillOnce(Return(63));
keyReader->open(params);
EXPECT_EQ(64, keyReader->read(buffer, 127));
QueryCancelPending = true;
// Note for coverage test, due to multithread execution, in this test case
// QueryCancelPending may or may not be hit in DownloadThreadFunc,
// QueryCancelPending in ChunkBuffer::read will always be hit and throw.
EXPECT_THROW(keyReader->read(buffer, 127), std::runtime_error);
}
\ No newline at end of file
#include "s3reader.cpp"
#include "gtest/gtest.h"
volatile bool QueryCancelPending = false;
TEST(OffsetMgr, simple) {
OffsetMgr *o = new OffsetMgr(4096, 1000);
Range r = o->NextOffset();
EXPECT_EQ(r.offset, 0);
EXPECT_EQ(r.len, 1000);
r = o->NextOffset();
EXPECT_EQ(r.offset, 1000);
EXPECT_EQ(r.len, 1000);
r = o->NextOffset();
EXPECT_EQ(r.offset, 2000);
EXPECT_EQ(r.len, 1000);
r = o->NextOffset();
EXPECT_EQ(r.offset, 3000);
EXPECT_EQ(r.len, 1000);
r = o->NextOffset();
EXPECT_EQ(r.offset, 4000);
EXPECT_EQ(r.len, 96);
delete o;
}
TEST(OffsetMgr, reset) {
OffsetMgr *o = new OffsetMgr(1024, 100);
o->NextOffset();
o->NextOffset();
o->Reset(333);
Range r = o->NextOffset();
EXPECT_EQ(r.offset, 333);
EXPECT_EQ(r.len, 100);
delete o;
}
\ No newline at end of file
......@@ -50,7 +50,7 @@ TEST(S3RESTfulService, GetWithoutURL) {
Response resp = service.get(url, headers, params);
EXPECT_EQ(FAIL, resp.getStatus());
EXPECT_EQ("failed to talk to s3 service URL using bad/illegal format or missing URL",
EXPECT_EQ("Failed to talk to s3 service URL using bad/illegal format or missing URL",
resp.getMessage());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册