提交 d0fe2b66 编写于 作者: P Peifeng Qiu 提交者: Adam Lee

s3ext: refactor error message handling

1. add global error message variable to report to gpdb console.
2. add try/catch block around fetchData() call to catch error message.
3. refactor shared error mechanism between ChunkBuffers.
4. fix unit tests.

Also fixed a memleak thanks to Kuien.
Signed-off-by: NHaozhou Wang <hawang@pivotal.io>
Signed-off-by: NKuien Liu <kliu@pivotal.io>
上级 2e1646a9
......@@ -8,6 +8,8 @@
#include "s3interface.h"
#include "s3key_reader.h"
extern string gpReaderErrorMessage;
class GPReader : public Reader {
public:
GPReader(const string &url);
......
......@@ -74,69 +74,7 @@ enum ChunkStatus {
ReadyToFill,
};
class ChunkBuffer {
public:
ChunkBuffer(const string& url, OffsetMgr& mgr, bool& sharedError, const S3Credential& cred,
const string& region);
~ChunkBuffer();
// In C++98, if a class has reference member,
// then it can't be copy assigned by default, we need to implement operator= explicitly.
ChunkBuffer& operator=(const ChunkBuffer& other);
bool isEOF() {
return this->eof;
}
// Error is shared among all ChunkBuffers of a KeyReader.
bool isError() {
return this->sharedError;
}
void setError() {
this->sharedError = true;
}
uint64_t read(char* buf, uint64_t len);
uint64_t fill();
void setS3interface(S3Interface* s3) {
this->s3interface = s3;
}
void init();
void destroy();
const pthread_cond_t* getStatCond() const {
return &stat_cond;
}
void setStatus(ChunkStatus status) {
this->status = status;
}
protected:
string sourceUrl;
private:
bool eof;
bool& sharedError;
ChunkStatus status;
pthread_mutex_t stat_mutex;
pthread_cond_t stat_cond;
uint64_t curFileOffset;
uint64_t curChunkOffset;
uint64_t chunkDataSize;
char* chunkData;
OffsetMgr& offsetMgr;
const S3Credential& credential;
const string& region;
S3Interface* s3interface;
};
class ChunkBuffer;
class S3KeyReader : public Reader {
public:
......@@ -146,8 +84,10 @@ class S3KeyReader : public Reader {
curReadingChunk(0),
transferredKeyLen(0),
s3interface(NULL) {
pthread_mutex_init(&this->mutexErrorMessage, NULL);
}
virtual ~S3KeyReader() {
pthread_mutex_destroy(&this->mutexErrorMessage);
}
void open(const ReaderParams& params);
......@@ -170,6 +110,13 @@ class S3KeyReader : public Reader {
return sharedError;
}
void setSharedError(bool sharedError, string message) {
pthread_mutex_lock(&this->mutexErrorMessage);
this->sharedErrorMessage = message;
this->sharedError = sharedError;
pthread_mutex_unlock(&this->mutexErrorMessage);
}
const vector<pthread_t>& getThreads() const {
return threads;
}
......@@ -178,8 +125,24 @@ class S3KeyReader : public Reader {
return transferredKeyLen;
}
const S3Credential& getCredential() const {
return credential;
}
OffsetMgr& getOffsetMgr() {
return offsetMgr;
}
const string& getRegion() const {
return region;
}
private:
pthread_mutex_t mutexErrorMessage;
bool sharedError;
string sharedErrorMessage;
uint64_t numOfChunks;
uint64_t curReadingChunk;
uint64_t transferredKeyLen;
......@@ -195,4 +158,67 @@ class S3KeyReader : public Reader {
void reset();
};
class ChunkBuffer {
public:
ChunkBuffer(const string& url, S3KeyReader& reader);
~ChunkBuffer();
// In C++98, if a class has reference member,
// then it can't be copy assigned by default, we need to implement operator= explicitly.
ChunkBuffer& operator=(const ChunkBuffer& other);
bool isEOF() {
return this->eof;
}
// Error is shared among all ChunkBuffers of a KeyReader.
bool isError() {
return this->sharedKeyReader.isSharedError();
}
uint64_t read(char* buf, uint64_t len);
uint64_t fill();
void setS3interface(S3Interface* s3) {
this->s3interface = s3;
}
void init();
void destroy();
pthread_cond_t* getStatCond() {
return &statusCondVar;
}
void setStatus(ChunkStatus status) {
this->status = status;
}
void setSharedError(bool sharedError, string message) {
this->sharedKeyReader.setSharedError(sharedError, message);
}
protected:
string sourceUrl;
private:
bool eof;
ChunkStatus status;
pthread_mutex_t statusMutex;
pthread_cond_t statusCondVar;
uint64_t curFileOffset;
uint64_t curChunkOffset;
uint64_t chunkDataSize;
char* chunkData;
OffsetMgr& offsetMgr;
S3Interface* s3interface;
S3KeyReader& sharedKeyReader;
};
#endif /* INCLUDE_S3KEYREADER_H_ */
......@@ -61,6 +61,8 @@ int thread_cleanup(void) {
return 1;
}
string gpReaderErrorMessage;
GPReader::GPReader(const string& url) {
constructReaderParam(url);
restfulServicePtr = &restfulService;
......@@ -101,7 +103,7 @@ void GPReader::close() {
// invoked by s3_import(), need to be exception safe
GPReader* reader_init(const char* url_with_options) {
GPReader* reader = NULL;
gpReaderErrorMessage.clear();
try {
if (!url_with_options) {
return NULL;
......@@ -139,6 +141,7 @@ GPReader* reader_init(const char* url_with_options) {
delete reader;
}
S3ERROR("reader_init caught an exception: %s, aborting", e.what());
gpReaderErrorMessage = e.what();
return NULL;
}
}
......@@ -160,6 +163,7 @@ bool reader_transfer_data(GPReader* reader, char* data_buf, int& data_len) {
data_len = (int)read_len;
} catch (std::exception& e) {
S3ERROR("reader_transfer_data caught an exception: %s, aborting", e.what());
gpReaderErrorMessage = e.what();
return false;
}
......@@ -179,6 +183,7 @@ bool reader_cleanup(GPReader** reader) {
}
} catch (std::exception& e) {
S3ERROR("reader_cleanup caught an exception: %s, aborting", e.what());
gpReaderErrorMessage = e.what();
result = false;
}
......
......@@ -65,7 +65,8 @@ Datum s3_import(PG_FUNCTION_ARGS) {
thread_cleanup();
if (!reader_cleanup(&gpreader)) {
ereport(ERROR, (0, errmsg("Failed to cleanup S3 extention")));
ereport(ERROR, (0, errmsg("Failed to cleanup S3 extension: %s",
gpReaderErrorMessage.c_str())));
}
EXTPROTOCOL_SET_USER_CTX(fcinfo, NULL);
......@@ -83,8 +84,8 @@ Datum s3_import(PG_FUNCTION_ARGS) {
if (!gpreader) {
ereport(ERROR, (0, errmsg("Failed to init S3 extension, segid = %d, "
"segnum = %d, please check your "
"configurations and net connection",
s3ext_segid, s3ext_segnum)));
"configurations and net connection: %s",
s3ext_segid, s3ext_segnum, gpReaderErrorMessage.c_str())));
}
check_essential_config();
......@@ -96,7 +97,8 @@ Datum s3_import(PG_FUNCTION_ARGS) {
int32 data_len = EXTPROTOCOL_GET_DATALEN(fcinfo);
if (!reader_transfer_data(gpreader, data_buf, data_len)) {
ereport(ERROR, (0, errmsg("s3_import: could not read data")));
ereport(ERROR,
(0, errmsg("s3_import: could not read data: %s", gpReaderErrorMessage.c_str())));
}
PG_RETURN_INT32(data_len);
}
......
......@@ -259,18 +259,17 @@ ListBucketResult *S3Service::listBucket(const string &schema, const string &regi
// S3 requires query parameters specified alphabetically.
string url = this->getUrl(prefix, schema, host.str(), bucket, marker);
xmlParserCtxtPtr xmlcontext = NULL;
XMLContextHolder holder(xmlcontext);
Response response = getBucketResponse(region, url, prefix, cred, marker);
xmlcontext = getXMLContext(response);
xmlParserCtxtPtr xmlContext = getXMLContext(response);
XMLContextHolder holder(xmlContext);
if (response.isSuccess()) {
if (parseBucketXML(result, xmlcontext, marker)) {
if (parseBucketXML(result, xmlContext, marker)) {
continue;
}
} else {
parseXMLMessage(xmlcontext);
parseXMLMessage(xmlContext);
}
delete result;
......@@ -307,14 +306,23 @@ uint64_t S3Service::fetchData(uint64_t offset, char *data, uint64_t len, const s
std::copy(responseData.begin(), responseData.end(), data);
return responseData.size();
} else if (resp.getStatus() == RESPONSE_ERROR) {
xmlParserCtxtPtr xmlptr = getXMLContext(resp);
parseXMLMessage(xmlptr);
xmlParserCtxtPtr xmlContext = getXMLContext(resp);
if (xmlContext != NULL) {
XMLContextHolder holder(xmlContext);
parseXMLMessage(xmlContext);
}
S3ERROR("Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
resp.getMessage().c_str());
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
resp.getMessage().c_str());
return 0;
} else {
S3ERROR("Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
resp.getMessage().c_str());
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
resp.getMessage().c_str());
return 0;
}
}
......@@ -349,8 +357,11 @@ S3CompressionType S3Service::checkCompressionType(const string &keyUrl, const st
}
} else {
if (resp.getStatus() == RESPONSE_ERROR) {
xmlParserCtxtPtr xmlptr = getXMLContext(resp);
parseXMLMessage(xmlptr);
xmlParserCtxtPtr xmlContext = getXMLContext(resp);
if (xmlContext != NULL) {
XMLContextHolder holder(xmlContext);
parseXMLMessage(xmlContext);
}
}
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", keyUrl.c_str(),
resp.getMessage().c_str());
......
......@@ -25,16 +25,11 @@ Range OffsetMgr::getNextOffset() {
return ret;
}
ChunkBuffer::ChunkBuffer(const string& url, OffsetMgr& mgr, bool& sharedError,
const S3Credential& cred, const string& region)
: sourceUrl(url),
sharedError(sharedError),
chunkData(NULL),
offsetMgr(mgr),
credential(cred),
region(region),
s3interface(NULL) {
Range range = mgr.getNextOffset();
ChunkBuffer::ChunkBuffer(const string& url, S3KeyReader& reader)
: sourceUrl(url), offsetMgr(reader.getOffsetMgr()), sharedKeyReader(reader) {
s3interface = NULL;
chunkData = NULL;
Range range = offsetMgr.getNextOffset();
curFileOffset = range.offset;
chunkDataSize = range.length;
status = ReadyToFill;
......@@ -65,8 +60,8 @@ void ChunkBuffer::init() {
chunkData = new char[offsetMgr.getChunkSize()];
CHECK_OR_DIE_MSG(chunkData != NULL, "%s", "Failed to allocate Buffer, no enough memory?");
pthread_mutex_init(&this->stat_mutex, NULL);
pthread_cond_init(&this->stat_cond, NULL);
pthread_mutex_init(&this->statusMutex, NULL);
pthread_cond_init(&this->statusCondVar, NULL);
}
void ChunkBuffer::destroy() {
......@@ -74,8 +69,8 @@ void ChunkBuffer::destroy() {
delete this->chunkData;
this->chunkData = NULL;
pthread_mutex_destroy(&this->stat_mutex);
pthread_cond_destroy(&this->stat_cond);
pthread_mutex_destroy(&this->statusMutex);
pthread_cond_destroy(&this->statusCondVar);
}
}
......@@ -89,15 +84,17 @@ uint64_t ChunkBuffer::read(char* buf, uint64_t len) {
// ReadyToFill, second call hangs.
CHECK_OR_DIE_MSG(!QueryCancelPending, "%s", "ChunkBuffer reading is interrupted by GPDB");
pthread_mutex_lock(&this->stat_mutex);
pthread_mutex_lock(&this->statusMutex);
while (this->status != ReadyToRead) {
pthread_cond_wait(&this->stat_cond, &this->stat_mutex);
pthread_cond_wait(&this->statusCondVar, &this->statusMutex);
}
// Error is shared between all chunks.
if (this->isError()) {
pthread_mutex_unlock(&this->stat_mutex);
CHECK_OR_DIE_MSG(false, "%s", "ChunkBuffers encounter a downloading error.");
pthread_mutex_unlock(&this->statusMutex);
// Don't throw here. Other chunks will set the shared error message,
// it will be handled by S3KeyReader.
return 0;
}
uint64_t leftLen = this->chunkDataSize - this->curChunkOffset;
......@@ -119,20 +116,20 @@ uint64_t ChunkBuffer::read(char* buf, uint64_t len) {
this->curFileOffset = range.offset;
this->chunkDataSize = range.length;
pthread_cond_signal(&this->stat_cond);
pthread_cond_signal(&this->statusCondVar);
}
}
pthread_mutex_unlock(&this->stat_mutex);
pthread_mutex_unlock(&this->statusMutex);
return lenToRead;
}
// returning -1 means error
uint64_t ChunkBuffer::fill() {
pthread_mutex_lock(&this->stat_mutex);
pthread_mutex_lock(&this->statusMutex);
while (this->status != ReadyToFill) {
pthread_cond_wait(&this->stat_cond, &this->stat_mutex);
pthread_cond_wait(&this->statusCondVar, &this->statusMutex);
}
uint64_t offset = this->curFileOffset;
......@@ -141,11 +138,18 @@ uint64_t ChunkBuffer::fill() {
uint64_t readLen = 0;
if (leftLen != 0) {
readLen = this->s3interface->fetchData(offset, this->chunkData, leftLen, this->sourceUrl,
this->region, this->credential);
try {
readLen = this->s3interface->fetchData(
offset, this->chunkData, leftLen, this->sourceUrl,
this->sharedKeyReader.getRegion(), this->sharedKeyReader.getCredential());
} catch (std::exception& e) {
S3DEBUG("Failed to fetch expected data from S3");
this->setSharedError(true, e.what());
}
if (readLen != leftLen) {
S3DEBUG("Failed to fetch expected data from S3");
this->sharedError = true;
this->setSharedError(true, "Failed to fetch expected data from S3");
} else {
S3DEBUG("Got %" PRIu64 " bytes from S3", readLen);
}
......@@ -158,10 +162,10 @@ uint64_t ChunkBuffer::fill() {
}
this->status = ReadyToRead;
pthread_cond_signal(&this->stat_cond);
pthread_mutex_unlock(&this->stat_mutex);
pthread_cond_signal(&this->statusCondVar);
pthread_mutex_unlock(&this->statusMutex);
return (this->sharedError) ? -1 : readLen;
return (this->isError()) ? -1 : readLen;
}
void* DownloadThreadFunc(void* data) {
......@@ -174,12 +178,12 @@ void* DownloadThreadFunc(void* data) {
S3INFO("Downloading thread is interrupted by GPDB");
// error is shared between all chunks, so all chunks will stop.
buffer->setError();
buffer->setSharedError(true, "Downloading thread is interrupted by GPDB");
// have to unlock ChunkBuffer::read in some certain conditions, for instance, status is
// not ReadyToRead, and read() is waiting for signal stat_cond.
buffer->setStatus(ReadyToRead);
pthread_cond_signal(const_cast<pthread_cond_t*>(buffer->getStatCond()));
pthread_cond_signal(buffer->getStatCond());
return NULL;
}
......@@ -200,6 +204,9 @@ void* DownloadThreadFunc(void* data) {
}
void S3KeyReader::open(const ReaderParams& params) {
this->sharedError = false;
this->sharedErrorMessage.clear();
this->numOfChunks = params.getNumOfChunks();
CHECK_OR_DIE_MSG(this->numOfChunks > 0, "%s", "numOfChunks must not be zero");
......@@ -214,9 +221,7 @@ void S3KeyReader::open(const ReaderParams& params) {
for (uint64_t i = 0; i < this->numOfChunks; i++) {
// when vector reallocate memory, it will copy object.
// chunkData must be initialized after all copy.
this->chunkBuffers.push_back(ChunkBuffer(params.getKeyUrl(), this->offsetMgr,
this->sharedError, this->credential,
this->region));
this->chunkBuffers.push_back(ChunkBuffer(params.getKeyUrl(), *this));
}
for (uint64_t i = 0; i < this->numOfChunks; i++) {
......@@ -245,6 +250,8 @@ uint64_t S3KeyReader::read(char* buf, uint64_t count) {
readLen = buffer.read(buf, count);
CHECK_OR_DIE_MSG(!this->sharedError, "%s", this->sharedErrorMessage.c_str());
this->transferredKeyLen += readLen;
if (readLen < count) {
......
......@@ -15,10 +15,15 @@
using namespace std;
S3RESTfulService::S3RESTfulService() {
// This function is not thread safe, must NOT call it when any other
// threads are running, that is, do NOT put it in threads.
curl_global_init(CURL_GLOBAL_ALL);
}
S3RESTfulService::~S3RESTfulService() {
// This function is not thread safe, must NOT call it when any other
// threads are running, that is, do NOT put it in threads.
curl_global_cleanup();
}
// curl's write function callback.
......
......@@ -252,10 +252,10 @@ TEST_F(S3ServiceTest, fetchDataFailedResponse) {
EXPECT_CALL(mockRestfulService, get(_, _, _)).WillOnce(Return(response));
char buffer[100];
EXPECT_EQ(0,
s3service->fetchData(0, buffer, 100,
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever",
region, cred));
EXPECT_THROW(s3service->fetchData(
0, buffer, 100,
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever", region, cred),
std::runtime_error);
}
TEST_F(S3ServiceTest, fetchDataPartialResponse) {
......@@ -352,7 +352,8 @@ TEST_F(S3ServiceTest, fetchDataWithResponseError) {
EXPECT_CALL(mockRestfulService, get(_, _, _)).WillOnce(Return(response));
EXPECT_EQ(0, s3service->fetchData(
EXPECT_THROW(s3service->fetchData(
0, buf, 128, "https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever",
region, cred));
region, cred),
std::runtime_error);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册