提交 3d134b34 编写于 作者: P Peifeng Qiu 提交者: Adam Lee

s3ext: fix a race condition and a exception bug

== race condition

Without this fix, ChunkBuffer::read() might be waiting for the signal
from ChunkBuffer::fill(), which has already been terminated in some
certain cases.

== exception bug

When a FAIL response from KeyReader is given, an exception
will be throw from S3Service::fetchData(), and this exception
will crash the downloader threads and process (core dumped).

We replace exception with error return, and it will be handled
by the caller.
Signed-off-by: NAdam Lee <ali@pivotal.io>
Signed-off-by: NKuien Liu <kliu@pivotal.io>
Signed-off-by: NHaozhou Wang <hawang@pivotal.io>
上级 33aab67d
......@@ -50,7 +50,7 @@ class OffsetMgr {
private:
pthread_mutex_t offsetLock;
uint64_t keySize;
uint64_t keySize; // size of S3 key(file)
uint64_t chunkSize;
uint64_t curPos;
};
......@@ -65,22 +65,6 @@ class ChunkBuffer {
ChunkBuffer(const string& url, OffsetMgr& mgr, bool& sharedError, const S3Credential& cred,
const string& region);
/*ChunkBuffer(const ChunkBuffer& other)
: sourceUrl(other.sourceUrl),
sharedError(other.sharedError),
offsetMgr(other.offsetMgr),
credential(other.credential),
region(other.region),
s3interface(other.s3interface) {
this->chunkData = other.chunkData;
curFileOffset = other.curFileOffset;
chunkDataSize = other.chunkDataSize;
status = other.status;
eof = other.eof;
curChunkOffset = other.curChunkOffset;
}*/
~ChunkBuffer();
bool isEOF() {
......@@ -105,6 +89,14 @@ class ChunkBuffer {
void init();
void destroy();
const pthread_cond_t* getStatCond() const {
return &stat_cond;
}
void setStatus(ChunkStatus status) {
this->status = status;
}
protected:
string sourceUrl;
......
......@@ -296,7 +296,10 @@ uint64_t S3Service::fetchData(uint64_t offset, char *data, uint64_t len, const s
Response resp = service->get(sourceUrl, headers, params);
if (resp.getStatus() == RESPONSE_OK) {
vector<uint8_t> &responseData = resp.getRawData();
CHECK_OR_DIE_MSG(responseData.size() == len, "%s", "Response is not fully received.");
if (responseData.size() != len) {
S3ERROR("%s", "Response is not fully received.");
return 0;
}
std::copy(responseData.begin(), responseData.end(), data);
return responseData.size();
......@@ -304,11 +307,10 @@ uint64_t S3Service::fetchData(uint64_t offset, char *data, uint64_t len, const s
xmlParserCtxtPtr xmlptr = getXMLContext(resp);
CHECK_OR_DIE(checkXMLMessage(xmlptr));
} else {
CHECK_OR_DIE_MSG(false, "Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
resp.getMessage().c_str());
S3ERROR("Failed to fetch: %s, Response message: %s", sourceUrl.c_str(),
resp.getMessage().c_str());
return 0;
}
return 0;
}
S3CompressionType S3Service::checkCompressionType(const string &keyUrl, const string &region,
......
......@@ -11,11 +11,7 @@ Range OffsetMgr::getNextOffset() {
Range ret;
pthread_mutex_lock(&this->offsetLock);
if (this->curPos < this->keySize) {
ret.offset = this->curPos;
} else {
ret.offset = this->keySize;
}
ret.offset = std::min(this->curPos, this->keySize);
if (this->curPos + this->chunkSize > this->keySize) {
ret.length = this->keySize - this->curPos;
......@@ -53,10 +49,7 @@ ChunkBuffer::~ChunkBuffer() {
// Copy constructor will copy members, but chunkData must not be initialized before copy.
// otherwise when worked with vector it will be freed twice.
void ChunkBuffer::init() {
if (chunkData != NULL) {
S3ERROR("Error: reinitializing chunkBuffer.");
return;
}
CHECK_OR_DIE_MSG(chunkData == NULL, "%s", "Error: reinitializing chunkBuffer.");
chunkData = new char[offsetMgr.getChunkSize()];
CHECK_OR_DIE_MSG(chunkData != NULL, "%s", "Failed to allocate Buffer, no enough memory?");
......@@ -168,6 +161,12 @@ void* DownloadThreadFunc(void* data) {
// error is shared between all chunks, so all chunks will stop.
buffer->setError();
// have to unlock ChunkBuffer::read in some certain conditions, for instance, status is
// not ReadyToRead, and read() is waiting for signal stat_cond.
buffer->setStatus(ReadyToRead);
pthread_cond_signal(const_cast<pthread_cond_t*>(buffer->getStatCond()));
return NULL;
}
......
......@@ -312,10 +312,10 @@ TEST_F(S3ServiceTest, fetchDataFailedResponse) {
EXPECT_CALL(mockRestfulService, get(_, _, _)).WillOnce(Return(response));
char buffer[100];
EXPECT_THROW(s3service->fetchData(
0, buffer, 100,
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever", region, cred),
std::runtime_error);
EXPECT_EQ(0,
s3service->fetchData(0, buffer, 100,
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever",
region, cred));
}
TEST_F(S3ServiceTest, fetchDataPartialResponse) {
......@@ -325,10 +325,10 @@ TEST_F(S3ServiceTest, fetchDataPartialResponse) {
EXPECT_CALL(mockRestfulService, get(_, _, _)).WillOnce(Return(response));
char buffer[100];
EXPECT_THROW(s3service->fetchData(
0, buffer, 100,
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever", region, cred),
std::runtime_error);
EXPECT_EQ(0,
s3service->fetchData(0, buffer, 100,
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/whatever",
region, cred));
}
TEST_F(S3ServiceTest, checkItsGzipCompressed) {
......
......@@ -89,6 +89,85 @@ TEST(OffsetMgr, KeySizeSmallerThanChunkSize) {
EXPECT_EQ(r.length, 0);
}
TEST(OffsetMgr, KeySizeEqualToChunkSize) {
OffsetMgr o;
o.setKeySize(127);
o.setChunkSize(127);
EXPECT_EQ(127, o.getChunkSize());
EXPECT_EQ(127, o.getKeySize());
Range r = o.getNextOffset();
EXPECT_EQ(r.offset, 0);
EXPECT_EQ(r.length, 127);
r = o.getNextOffset();
EXPECT_EQ(r.offset, 127);
EXPECT_EQ(r.length, 0);
r = o.getNextOffset();
EXPECT_EQ(r.offset, 127);
EXPECT_EQ(r.length, 0);
}
TEST(OffsetMgr, KeySizeIsDevidedByChunkSize) {
OffsetMgr o;
o.setKeySize(635);
o.setChunkSize(127);
EXPECT_EQ(127, o.getChunkSize());
EXPECT_EQ(635, o.getKeySize());
Range r = o.getNextOffset();
EXPECT_EQ(r.offset, 0);
EXPECT_EQ(r.length, 127);
r = o.getNextOffset();
EXPECT_EQ(r.offset, 127);
EXPECT_EQ(r.length, 127);
r = o.getNextOffset();
EXPECT_EQ(r.offset, 254);
EXPECT_EQ(r.length, 127);
r = o.getNextOffset();
EXPECT_EQ(r.offset, 381);
EXPECT_EQ(r.length, 127);
r = o.getNextOffset();
EXPECT_EQ(r.offset, 508);
EXPECT_EQ(r.length, 127);
r = o.getNextOffset();
EXPECT_EQ(r.offset, 635);
EXPECT_EQ(r.length, 0);
r = o.getNextOffset();
EXPECT_EQ(r.offset, 635);
EXPECT_EQ(r.length, 0);
}
TEST(OffsetMgr, KeySizeIsZero) {
OffsetMgr o;
o.setKeySize(0);
o.setChunkSize(1000);
EXPECT_EQ(1000, o.getChunkSize());
EXPECT_EQ(0, o.getKeySize());
Range r = o.getNextOffset();
EXPECT_EQ(r.offset, 0);
EXPECT_EQ(r.length, 0);
r = o.getNextOffset();
EXPECT_EQ(r.offset, 0);
EXPECT_EQ(r.length, 0);
r = o.getNextOffset();
EXPECT_EQ(r.offset, 0);
EXPECT_EQ(r.length, 0);
}
TEST_F(S3KeyReaderTest, OpenWithZeroChunk) {
params.setNumOfChunks(0);
......@@ -366,6 +445,66 @@ TEST_F(S3KeyReaderTest, MTReadWithFragmentalReadRequests) {
EXPECT_EQ(0, keyReader->read(buffer, 31));
}
TEST_F(S3KeyReaderTest, MTReadWithHundredsOfThreads) {
params.setNumOfChunks(127);
params.setRegion("us-west-2");
params.setKeySize(1024);
params.setChunkSize(64);
EXPECT_CALL(s3interface, fetchData(_, _, _, _, _, _)).WillRepeatedly(Return(64));
keyReader->open(params);
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(0, keyReader->read(buffer, 31));
}
TEST_F(S3KeyReaderTest, MTReadWithFetchDataError) {
params.setNumOfChunks(3);
params.setRegion("us-west-2");
......@@ -467,4 +606,28 @@ TEST_F(S3KeyReaderTest, MTReadWithGPDBCancel) {
// QueryCancelPending in ChunkBuffer::read will always be hit and throw.
EXPECT_THROW(keyReader->read(buffer, 127), std::runtime_error);
}
\ No newline at end of file
}
TEST_F(S3KeyReaderTest, MTReadWithHundredsOfThreadsAndSignalCancel) {
params.setNumOfChunks(127);
params.setRegion("us-west-2");
params.setKeySize(1024);
params.setChunkSize(64);
EXPECT_CALL(s3interface, fetchData(_, _, _, _, _, _)).WillRepeatedly(Return(64));
keyReader->open(params);
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(31, keyReader->read(buffer, 31));
EXPECT_EQ(2, keyReader->read(buffer, 31));
QueryCancelPending = true;
EXPECT_THROW(keyReader->read(buffer, 31), std::runtime_error);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册