提交 5c4e6b5b 编写于 作者: K Kuien Liu 提交者: Adam Lee

s3ext: support data compression when writing data to S3

If 'autocompress' in s3 configure file is set to 'true', all data will be
compressed before uploaded to S3. In this way, we can reduce the network
traffic significantly, which means money-saving as well.

The data will be compressed in 'NO_FLUSH' way before injected into underlayer
S3KeyWriter's buffer, and the latter will invoke RESTFul layer to finish data
uploading. We don't buffer data issued from s3extprotocol in compression layer,
that is, all data blocks will be injected into ZStream to deflate immediately.
It is because experimental results show little performance improvement by doing
this while consuming more memory.

Signed-off-by: Peifeng Qiu, Adam Lee
上级 e9f90691
......@@ -63,4 +63,13 @@ source_replaced
*.diffs
.idea
*.sln
*.suo
*.vcxproj
*.vcxproj.user
*.filters
*.VC.db
*.VC.opendb
CMakeLists.txt
......@@ -50,7 +50,7 @@ class S(BaseHTTPRequestHandler):
def do_DELETE(self):
# Just bounce the request back
print "----- SOMETHING WAS DELETE ------"
print "----- SOMETHING WAS DELETED ------"
print self.headers
length = int(self.headers['Content-Length'])
# content = self.rfile.read(length)
......
#ifndef INCLUDE_COMPRESS_WRITER_H_
#define INCLUDE_COMPRESS_WRITER_H_
#include <zlib.h>
#include "writer.h"
// 2MB by default
extern uint64_t S3_ZIP_COMPRESS_CHUNKSIZE;
class CompressWriter : public Writer {
public:
CompressWriter();
virtual ~CompressWriter();
virtual void open(const WriterParams &params);
// write() attempts to write up to count bytes from the buffer.
// Always return 0 if EOF, no matter how many times it's invoked. Throw exception if encounters
// errors.
virtual uint64_t write(const char *buf, uint64_t count);
// This should be reentrant, has no side effects when called multiple times.
virtual void close();
void setWriter(Writer *writer);
private:
void flush();
Writer *writer;
// zlib related variables.
z_stream zstream;
char *out; // Output buffer for compression.
// add this flag to make close() reentrant
bool isClosed;
};
#endif
\ No newline at end of file
......@@ -5,21 +5,21 @@
#include "reader.h"
// 2MB by default
extern uint64_t S3_ZIP_CHUNKSIZE;
extern uint64_t S3_ZIP_DECOMPRESS_CHUNKSIZE;
class DecompressReader : public Reader {
public:
DecompressReader();
virtual ~DecompressReader();
void open(const ReaderParams &params);
virtual void open(const ReaderParams &params);
// read() attempts to read up to count bytes into the buffer.
// Return 0 if EOF. Throw exception if encounters errors.
uint64_t read(char *buf, uint64_t count);
virtual uint64_t read(char *buf, uint64_t count);
// This should be reentrant, has no side effects when called multiple times.
void close();
virtual void close();
void setReader(Reader *reader);
......@@ -29,7 +29,7 @@ class DecompressReader : public Reader {
void decompress();
uint64_t getDecompressedBytesNum() {
return S3_ZIP_CHUNKSIZE - this->zstream.avail_out;
return S3_ZIP_DECOMPRESS_CHUNKSIZE - this->zstream.avail_out;
}
Reader *reader;
......
......@@ -36,7 +36,6 @@ class GPReader : public Reader {
protected:
S3BucketReader bucketReader;
S3CommonReader commonReader;
DecompressReader uncomressReader;
S3RESTfulService restfulService;
S3Service s3service;
......
......@@ -4,7 +4,7 @@
#include <string.h>
#include <string>
#include "s3key_writer.h"
#include "s3common_writer.h"
#include "writer.h"
#define S3_DEFAULT_FORMAT "data"
......@@ -20,7 +20,7 @@ class GPWriter : public Writer {
// write() attempts to write up to count bytes from the buffer.
// Always return 0 if EOF, no matter how many times it's invoked. Throw exception if encounters
// errors.
virtual uint64_t write(char *buf, uint64_t count);
virtual uint64_t write(const char *buf, uint64_t count);
// This should be reentrant, has no side effects when called multiple times.
virtual void close();
......@@ -39,13 +39,12 @@ class GPWriter : public Writer {
string genUniqueKeyName(const string &url);
protected:
string format;
WriterParams params;
S3RESTfulService restfulService;
S3Service s3service;
S3Credential cred;
S3KeyWriter keyWriter;
string format;
S3CommonWriter commonWriter;
// it links to itself by default
// but the pointer here leaves a chance to mock it in unit test
......
COMMON_OBJS = gpreader.o gpwriter.o s3conf.o s3common.o s3utils.o s3log.o s3url.o s3http_headers.o s3interface.o s3restful_service.o decompress_reader.o s3key_reader.o s3key_writer.o s3bucket_reader.o s3common_reader.o
COMMON_OBJS = gpreader.o gpwriter.o s3conf.o s3common.o s3utils.o s3log.o s3url.o s3http_headers.o s3interface.o s3restful_service.o decompress_reader.o s3key_reader.o compress_writer.o s3common_writer.o s3key_writer.o s3bucket_reader.o s3common_reader.o
COMMON_LINK_OPTIONS = -lstdc++ -lxml2 -lpthread -lcrypto -lcurl -lz
......
......@@ -17,14 +17,14 @@ enum ResponseStatus {
RESPONSE_OK, // everything is OK
RESPONSE_FAIL, // curl failed (i.e., the status is not CURLE_OK)
RESPONSE_ERROR, // server error (server return code is not 200)
RESPONSE_ABORT, // the query has been abort by user
RESPONSE_ABORT, // the query has been aborted by user
};
typedef long ResponseCode;
#define HeadResponseFail -1
// 2XX are successful response.
// Here we deal with 200 (OK), 204 (no content) and 206 (partial content) currently.
// Here we deal with 200 (OK), 204 (no content) and 206 (partial content) currently,
// 204 is for DELETE request.
// We may move this function to RESTfulService() in future
inline bool isSuccessfulResponse(ResponseCode code) {
......
......@@ -21,6 +21,7 @@ class S3CommonReader : public Reader {
// This should be reentrant, has no side effects when called multiple times.
virtual void close();
// Used by Mock, DO NOT call it in other places.
void setS3service(S3Interface* s3service) {
this->s3service = s3service;
}
......
#ifndef INCLUDE_S3COMMON_WRITER_H_
#define INCLUDE_S3COMMON_WRITER_H_
#include "compress_writer.h"
#include "s3key_writer.h"
class S3CommonWriter : public Writer {
public:
S3CommonWriter() : upstreamWriter(NULL), s3service(NULL) {
}
virtual ~S3CommonWriter() {
}
virtual void open(const WriterParams& params);
// write() attempts to write up to count bytes from the buffer.
// Always return 0 if EOF, no matter how many times it's invoked. Throw exception if encounters
// errors.
virtual uint64_t write(const char* buf, uint64_t count);
// This should be reentrant, has no side effects when called multiple times.
virtual void close();
// Used by Mock, DO NOT call it in other places.
void setS3service(S3Interface* s3service) {
this->s3service = s3service;
}
protected:
Writer* upstreamWriter;
S3Interface* s3service;
S3KeyWriter keyWriter;
CompressWriter compressWriter;
};
#endif
\ No newline at end of file
......@@ -50,5 +50,8 @@ extern struct sockaddr_in s3ext_logserveraddr;
extern int32_t s3ext_low_speed_limit;
extern int32_t s3ext_low_speed_time;
// whether to compress data before uploading
extern bool s3ext_autocompress;
// not thread safe!! call it only once.
bool InitConfig(const string &path, const string section);
......@@ -28,7 +28,7 @@ class S3KeyWriter : public Writer {
// write() attempts to write up to count bytes from the buffer.
// Always return 0 if EOF, no matter how many times it's invoked. Throw exception if encounters
// errors.
virtual uint64_t write(char* buf, uint64_t count);
virtual uint64_t write(const char* buf, uint64_t count);
// This should be reentrant, has no side effects when called multiple times.
virtual void close();
......@@ -40,7 +40,7 @@ class S3KeyWriter : public Writer {
protected:
void flushBuffer();
void completeKeyWriting();
void checkQueryCancel();
void checkQueryCancelSignal();
WriterBuffer buffer;
S3Interface* s3interface;
......
......@@ -9,7 +9,7 @@
using std::string;
using std::stringstream;
#define BUFFER_LEN 1024
#define PRINTOUT_BUFFER_LEN 1024
extern void StringAppendPrintf(std::string *output, const char *format, ...);
......@@ -39,8 +39,8 @@ extern void StringAppendPrintf(std::string *output, const char *format, ...);
inline void VStringAppendPrintf(string *output, const char *format, va_list argp) {
CHECK_OR_DIE(output);
char buffer[BUFFER_LEN];
vsnprintf(buffer, BUFFER_LEN, format, argp);
char buffer[PRINTOUT_BUFFER_LEN];
vsnprintf(buffer, sizeof(buffer), format, argp);
output->append(buffer);
}
......@@ -51,4 +51,19 @@ inline void StringAppendPrintf(string *output, const char *format, ...) {
va_end(argp);
}
// chunk size for compression/decompression
// declare them here so UT tests can access each as well
extern uint64_t S3_ZIP_DECOMPRESS_CHUNKSIZE;
extern uint64_t S3_ZIP_COMPRESS_CHUNKSIZE;
#define S3_ZIP_DEFAULT_CHUNKSIZE (1024 * 1024 * 2)
// For deflate, windowBits can be greater than 15 for optional gzip encoding. Add 16 to windowBits
// to write a simple gzip header and trailer around the compressed data instead of a zlib wrapper.
#define S3_DEFLATE_WINDOWSBITS (MAX_WBITS + 16)
// For inflate, windowBits can be greater than 15 for optional gzip decoding. Add 32 to windowBits
// to enable zlib and gzip decoding with automatic header detection.
#define S3_INFLATE_WINDOWSBITS (MAX_WBITS + 16 + 16)
#endif
......@@ -13,7 +13,7 @@ class Writer {
// write() attempts to write up to count bytes from the buffer.
// Always return 0 if EOF, no matter how many times it's invoked. Throw exception if encounters
// errors.
virtual uint64_t write(char *buf, uint64_t count) = 0;
virtual uint64_t write(const char *buf, uint64_t count) = 0;
// This should be reentrant, has no side effects when called multiple times.
virtual void close() = 0;
......
#define __STDC_FORMAT_MACROS
#include <inttypes.h>
#include <string.h>
#include "compress_writer.h"
#include "s3log.h"
#include "s3macros.h"
uint64_t S3_ZIP_COMPRESS_CHUNKSIZE = S3_ZIP_DEFAULT_CHUNKSIZE;
CompressWriter::CompressWriter() : writer(NULL), isClosed(false) {
this->out = new char[S3_ZIP_COMPRESS_CHUNKSIZE];
}
CompressWriter::~CompressWriter() {
delete this->out;
}
void CompressWriter::open(const WriterParams& params) {
this->zstream.zalloc = Z_NULL;
this->zstream.zfree = Z_NULL;
this->zstream.opaque = Z_NULL;
// With S3_DEFLATE_WINDOWSBITS, it generates gzip stream with header and trailer
int ret = deflateInit2(&this->zstream, Z_DEFAULT_COMPRESSION, Z_DEFLATED,
S3_DEFLATE_WINDOWSBITS, 8, Z_DEFAULT_STRATEGY);
// init them here to get ready for both writer() and close()
this->zstream.next_in = NULL;
this->zstream.avail_in = 0;
this->zstream.next_out = (Byte*)this->out;
this->zstream.avail_out = S3_ZIP_COMPRESS_CHUNKSIZE;
CHECK_OR_DIE_MSG(ret == Z_OK, "Failed to initialize zlib library: %s", this->zstream.msg);
this->writer->open(params);
this->isClosed = false;
}
uint64_t CompressWriter::write(const char* buf, uint64_t count) {
if (buf == NULL || count == 0) {
return 0;
}
// we assume data block from GPDB is always smaller than gzip chunkbuffer
CHECK_OR_DIE_MSG(count < S3_ZIP_COMPRESS_CHUNKSIZE,
"Data size " PRIu64 " is larger than S3_ZIP_COMPRESS_CHUNKSIZE", count);
this->zstream.next_in = (Byte*)buf;
this->zstream.avail_in = count;
int status = deflate(&this->zstream, Z_NO_FLUSH);
if (status < 0 && status != Z_BUF_ERROR) {
deflateEnd(&this->zstream);
CHECK_OR_DIE_MSG(false, "Failed to compress data: %d, %s", status, this->zstream.msg);
}
this->flush();
return count;
}
void CompressWriter::close() {
if (this->isClosed) {
return;
}
int status;
do {
status = deflate(&this->zstream, Z_FINISH);
this->flush();
} while (status == Z_OK);
deflateEnd(&this->zstream);
if (status != Z_STREAM_END) {
CHECK_OR_DIE_MSG(false, "Failed to finish data compression: %d, %s", status,
this->zstream.msg);
}
S3DEBUG("Compression finished: Z_STREAM_END.");
this->writer->close();
this->isClosed = true;
}
void CompressWriter::setWriter(Writer* writer) {
this->writer = writer;
}
void CompressWriter::flush() {
if (this->zstream.avail_out < S3_ZIP_COMPRESS_CHUNKSIZE) {
this->writer->write(this->out, S3_ZIP_COMPRESS_CHUNKSIZE - this->zstream.avail_out);
this->zstream.next_out = (Byte*)this->out;
this->zstream.avail_out = S3_ZIP_COMPRESS_CHUNKSIZE;
}
}
\ No newline at end of file
......@@ -8,12 +8,12 @@
#include "s3log.h"
#include "s3macros.h"
uint64_t S3_ZIP_CHUNKSIZE = 1024 * 1024 * 2;
uint64_t S3_ZIP_DECOMPRESS_CHUNKSIZE = S3_ZIP_DEFAULT_CHUNKSIZE;
DecompressReader::DecompressReader() {
this->reader = NULL;
this->in = new char[S3_ZIP_CHUNKSIZE];
this->out = new char[S3_ZIP_CHUNKSIZE];
this->in = new char[S3_ZIP_DECOMPRESS_CHUNKSIZE];
this->out = new char[S3_ZIP_DECOMPRESS_CHUNKSIZE];
this->outOffset = 0;
}
......@@ -45,12 +45,12 @@ void DecompressReader::open(const ReaderParams &params) {
zstream.next_out = (Byte *)this->out;
zstream.avail_in = 0;
zstream.avail_out = S3_ZIP_CHUNKSIZE;
zstream.avail_out = S3_ZIP_DECOMPRESS_CHUNKSIZE;
this->outOffset = 0;
// 47 is the number of windows bits, to make sure zlib could recognize and decode gzip stream.
int ret = inflateInit2(&zstream, 47);
// with S3_INFLATE_WINDOWSBITS, it could recognize and decode both zlib and gzip stream.
int ret = inflateInit2(&zstream, S3_INFLATE_WINDOWSBITS);
CHECK_OR_DIE_MSG(ret == Z_OK, "%s", "failed to initialize zlib library");
this->reader->open(params);
......@@ -59,9 +59,6 @@ void DecompressReader::open(const ReaderParams &params) {
uint64_t DecompressReader::read(char *buf, uint64_t bufSize) {
uint64_t remainingOutLen = this->getDecompressedBytesNum() - this->outOffset;
// S3DEBUG("has = %" PRIu64 ", offset = %d, chunksize = %u, avail_out = %u, count = %"
// PRIu64,
// remainingOutLen, outOffset, S3_ZIP_CHUNKSIZE, this->zstream.avail_out, bufSize);
if (remainingOutLen == 0) {
this->decompress();
this->outOffset = 0; // reset cursor for out buffer to read from beginning.
......@@ -77,16 +74,16 @@ uint64_t DecompressReader::read(char *buf, uint64_t bufSize) {
}
// Read compressed data from underlying reader and decompress to this->out buffer.
// If no more data to consume, this->zstream.avail_out == S3_ZIP_CHUNKSIZE;
// If no more data to consume, this->zstream.avail_out == S3_ZIP_DECOMPRESS_CHUNKSIZE;
void DecompressReader::decompress() {
if (this->zstream.avail_in == 0) {
this->zstream.avail_out = S3_ZIP_CHUNKSIZE;
this->zstream.avail_out = S3_ZIP_DECOMPRESS_CHUNKSIZE;
this->zstream.next_out = (Byte *)this->out;
// reader S3_ZIP_CHUNKSIZE data from underlying reader and put into this->in buffer.
// read() might happen more than one time when it's EOF, make sure every time read() will
// return 0.
uint64_t hasRead = this->reader->read(this->in, S3_ZIP_CHUNKSIZE);
// read S3_ZIP_DECOMPRESS_CHUNKSIZE data from underlying reader and put into this->in
// buffer. read() might happen more than once when reaching EOF, make sure every time read()
// will return 0.
uint64_t hasRead = this->reader->read(this->in, S3_ZIP_DECOMPRESS_CHUNKSIZE);
// EOF, no more data to decompress.
if (hasRead == 0) {
......@@ -99,8 +96,9 @@ void DecompressReader::decompress() {
// Fill this->in as possible as it could, otherwise data in this->in might not be able to be
// inflated.
while (hasRead < S3_ZIP_CHUNKSIZE) {
uint64_t count = this->reader->read(this->in + hasRead, S3_ZIP_CHUNKSIZE - hasRead);
while (hasRead < S3_ZIP_DECOMPRESS_CHUNKSIZE) {
uint64_t count =
this->reader->read(this->in + hasRead, S3_ZIP_DECOMPRESS_CHUNKSIZE - hasRead);
if (count == 0) {
break;
......@@ -113,25 +111,17 @@ void DecompressReader::decompress() {
this->zstream.avail_in = hasRead;
} else {
// Still have more data in 'in' buffer to decode.
this->zstream.avail_out = S3_ZIP_CHUNKSIZE;
this->zstream.avail_out = S3_ZIP_DECOMPRESS_CHUNKSIZE;
this->zstream.next_out = (Byte *)this->out;
}
// S3DEBUG("Before decompress: avail_in = %u, avail_out = %u, total_in = %u, total_out = %u",
// zstream.avail_in, zstream.avail_out, zstream.total_in, zstream.total_out);
int status = inflate(&this->zstream, Z_NO_FLUSH);
if (status == Z_STREAM_END) {
S3DEBUG("Compression finished: Z_STREAM_END.");
S3DEBUG("Decompression finished: Z_STREAM_END.");
} else if (status < 0 || status == Z_NEED_DICT) {
inflateEnd(&this->zstream);
CHECK_OR_DIE_MSG(false, "Failed to decompress data: %d", status);
}
// S3DEBUG("After decompress: avail_in = %u, avail_out = %u, total_in = %u, total_out = %u",
// zstream.avail_in, zstream.avail_out, zstream.total_in, zstream.total_out);
return;
}
void DecompressReader::close() {
......
......@@ -47,16 +47,16 @@ void GPWriter::constructWriterParams(const string& url) {
void GPWriter::open(const WriterParams& params) {
this->s3service.setRESTfulService(this->restfulServicePtr);
this->params.setKeyUrl(this->genUniqueKeyName(this->params.getBaseUrl()));
this->keyWriter.setS3interface(&this->s3service);
this->keyWriter.open(this->params);
this->commonWriter.setS3service(&this->s3service);
this->commonWriter.open(this->params);
}
uint64_t GPWriter::write(char* buf, uint64_t count) {
return this->keyWriter.write(buf, count);
uint64_t GPWriter::write(const char* buf, uint64_t count) {
return this->commonWriter.write(buf, count);
}
void GPWriter::close() {
this->keyWriter.close();
this->commonWriter.close();
}
string GPWriter::genUniqueKeyName(const string& url) {
......@@ -127,7 +127,8 @@ GPWriter* writer_init(const char* url_with_options, const char* format) {
InitRemoteLog();
writer = new GPWriter(url, format);
string extName = s3ext_autocompress ? string(format) + ".gz" : format;
writer = new GPWriter(url, extName);
if (writer == NULL) {
return NULL;
}
......
......@@ -114,8 +114,6 @@ void S3BucketReader::close() {
if (!this->keyList.contents.empty()) {
this->keyList.contents.clear();
}
return;
}
string S3BucketReader::getKeyURL(const string &key) {
......
#include "s3common_writer.h"
#include "s3macros.h"
void S3CommonWriter::open(const WriterParams& params) {
this->keyWriter.setS3interface(this->s3service);
if (s3ext_autocompress) {
this->upstreamWriter = &this->compressWriter;
this->compressWriter.setWriter(&this->keyWriter);
} else {
this->upstreamWriter = &this->keyWriter;
}
this->upstreamWriter->open(params);
}
uint64_t S3CommonWriter::write(const char* buf, uint64_t count) {
return this->upstreamWriter->write(buf, count);
}
void S3CommonWriter::close() {
this->upstreamWriter->close();
}
\ No newline at end of file
......@@ -44,6 +44,7 @@ string s3ext_token;
bool s3ext_encryption = true;
bool s3ext_debug_curl = false;
bool s3ext_autocompress = true;
int32_t s3ext_segid = -1;
int32_t s3ext_segnum = -1;
......@@ -140,6 +141,9 @@ bool InitConfig(const string& conf_path, const string section = "default") {
content = s3cfg->Get(section.c_str(), "encryption", "true");
s3ext_encryption = to_bool(content);
content = s3cfg->Get(section.c_str(), "autocompress", "true");
s3ext_autocompress = to_bool(content);
#ifdef S3_STANDALONE
s3ext_segid = 0;
s3ext_segnum = 1;
......
......@@ -492,7 +492,6 @@ string S3Service::uploadPartOfData(vector<uint8_t> &data, const string &keyUrl,
urlWithQuery << keyUrl << "?partNumber=" << partNumber << "&uploadId=" << uploadId;
Response resp = this->putResponseWithRetries(urlWithQuery.str(), headers, params, data);
if (resp.getStatus() == RESPONSE_OK) {
string headers(resp.getRawHeaders().begin(), resp.getRawHeaders().end());
......
......@@ -21,9 +21,9 @@ void S3KeyWriter::open(const WriterParams &params) {
// write() attempts to write up to count bytes from the buffer.
// Always return 0 if EOF, no matter how many times it's invoked. Throw exception if encounters
// errors.
uint64_t S3KeyWriter::write(char *buf, uint64_t count) {
uint64_t S3KeyWriter::write(const char *buf, uint64_t count) {
CHECK_OR_DIE(buf != NULL);
this->checkQueryCancel();
this->checkQueryCancelSignal();
// GPDB issues 64K- block every time and chunkSize is 8MB+
if (count > this->chunkSize) {
......@@ -42,14 +42,14 @@ uint64_t S3KeyWriter::write(char *buf, uint64_t count) {
// This should be reentrant, has no side effects when called multiple times.
void S3KeyWriter::close() {
this->checkQueryCancel();
this->checkQueryCancelSignal();
if (!this->uploadId.empty()) {
this->completeKeyWriting();
}
}
void S3KeyWriter::checkQueryCancel() {
void S3KeyWriter::checkQueryCancelSignal() {
if (QueryCancelPending && !this->uploadId.empty()) {
this->s3interface->abortUpload(this->url, this->region, this->cred, this->uploadId);
this->etagList.clear();
......@@ -67,11 +67,11 @@ void S3KeyWriter::flushBuffer() {
this->buffer.clear();
// most time query is canceled during uploadPartOfData,
// this is the first chance to cancel and clean up upload.
// Otherwise GPDB will call with LAST_CALL but QueryCancelPending is set to false.
// and we can't detect query cancel in S3KeyWriter::close.
this->checkQueryCancel();
// Most time query is canceled during uploadPartOfData,
// This is the first chance to cancel and clean up upload.
// Otherwise GPDB will call with LAST_CALL but QueryCancelPending is set to false,
// and we can't detect query cancel signal in S3KeyWriter::close().
this->checkQueryCancelSignal();
}
}
......
......@@ -39,7 +39,7 @@ size_t RESTfulServiceWriteFuncCallback(char *ptr, size_t size, size_t nmemb, voi
return realsize;
}
// curl's write function callback, used only by DELETE request when query is canceled.
// cURL's write function callback, only used by DELETE request when query is canceled.
// It shouldn't be interrupted by QueryCancelPending.
size_t RESTfulServiceAbortFuncCallback(char *ptr, size_t size, size_t nmemb, void *userp) {
size_t realsize = size * nmemb;
......@@ -409,8 +409,7 @@ Response S3RESTfulService::deleteRequest(const string &url, HTTPHeaders &headers
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &responseCode);
// 2XX are successful response. Here we deal with 200 (OK) 204 (no content), and 206
// (partial content)
// firstly.
// (partial content) firstly.
if (isSuccessfulResponse(responseCode)) {
response.setStatus(RESPONSE_OK);
response.setMessage("Success");
......
#include <vector>
#include "compress_writer.cpp"
#include "gtest/gtest.h"
#include "s3macros.h"
using std::vector;
class MockWriter : public Writer {
public:
MockWriter() {
}
virtual ~MockWriter() {
}
virtual void open(const WriterParams &params) {
}
virtual uint64_t write(const char *buf, uint64_t count) {
this->data.insert(this->data.end(), buf, buf + count);
return count;
}
virtual void close() {
// this->data.clear();
}
const char *getRawData() const {
return this->data.data();
}
vector<char> &getRawDataVector() {
return this->data;
}
size_t getDataSize() const {
return this->data.size();
}
private:
vector<char> data;
};
class CompressWriterTest : public testing::Test {
protected:
// Remember that SetUp() is run immediately before a test starts.
virtual void SetUp() {
compressWriter.setWriter(&writer);
compressWriter.open(params);
this->out = new Byte[S3_ZIP_DECOMPRESS_CHUNKSIZE];
}
// TearDown() is invoked immediately after a test finishes.
virtual void TearDown() {
compressWriter.close();
delete this->out;
}
void simpleUncompress(const char *input, uint64_t len) {
z_stream zstream;
// allocate inflate state for zlib
zstream.zalloc = Z_NULL;
zstream.zfree = Z_NULL;
zstream.opaque = Z_NULL;
int ret = inflateInit2(&zstream, S3_INFLATE_WINDOWSBITS);
CHECK_OR_DIE_MSG(ret == Z_OK, "%s", "failed to initialize zlib library");
zstream.avail_in = len;
zstream.next_in = (Byte *)input;
zstream.next_out = this->out;
zstream.avail_out = S3_ZIP_DECOMPRESS_CHUNKSIZE;
ret = inflate(&zstream, Z_FULL_FLUSH);
if (ret != Z_STREAM_END) {
S3DEBUG("Failed to uncompress sample data");
}
inflateEnd(&zstream);
}
WriterParams params;
CompressWriter compressWriter;
MockWriter writer;
Byte *out;
};
TEST_F(CompressWriterTest, AbleToInputNull) {
compressWriter.write(NULL, 0);
EXPECT_EQ(0, writer.getDataSize());
}
TEST_F(CompressWriterTest, AbleToCompressEmptyData) {
char input[10] = {0};
compressWriter.write(input, sizeof(input));
compressWriter.close();
this->simpleUncompress(writer.getRawData(), writer.getDataSize());
EXPECT_STREQ(input, (const char *)this->out);
}
TEST_F(CompressWriterTest, AbleToCompressAndCheckGZipHeader) {
char input[10] = {0};
compressWriter.write(input, sizeof(input));
compressWriter.close();
const char *header = writer.getRawData();
ASSERT_TRUE(header[0] == char(0x1f));
ASSERT_TRUE(header[1] == char(0x8b));
}
TEST_F(CompressWriterTest, CloseMultipleTimes) {
char input[10] = {0};
compressWriter.write(input, sizeof(input));
compressWriter.close();
compressWriter.close();
compressWriter.close();
compressWriter.close();
this->simpleUncompress(writer.getRawData(), writer.getDataSize());
EXPECT_STREQ(input, (const char *)this->out);
}
TEST_F(CompressWriterTest, AbleToCompressOneSmallString) {
const char input[] = "The quick brown fox jumps over the lazy dog";
compressWriter.write(input, sizeof(input));
compressWriter.close();
this->simpleUncompress(writer.getRawData(), writer.getDataSize());
EXPECT_STREQ(input, (const char *)this->out);
}
TEST_F(CompressWriterTest, AbleToWriteServeralTimesBeforeClose) {
unsigned int i, times = 100;
const char input[] = "The quick brown fox jumps over the lazy dog";
for (i = 0; i < times; i++) {
compressWriter.write(input, sizeof(input));
}
compressWriter.close();
this->simpleUncompress(writer.getRawData(), writer.getDataSize());
for (i = 0; i < times; i++) {
ASSERT_TRUE(memcmp(input, (const char *)this->out + i * sizeof(input), sizeof(input)) == 0);
}
}
TEST_F(CompressWriterTest, AbleToWriteLargerThanCompressChunkSize) {
char *input = new char[S3_ZIP_COMPRESS_CHUNKSIZE + 1];
EXPECT_THROW(compressWriter.write(input, S3_ZIP_COMPRESS_CHUNKSIZE + 1), std::runtime_error);
}
\ No newline at end of file
......@@ -2,6 +2,7 @@
#include "decompress_reader.cpp"
#include "gtest/gtest.h"
#include "s3macros.h"
using std::vector;
......@@ -59,6 +60,9 @@ class DecompressReaderTest : public testing::Test {
protected:
// Remember that SetUp() is run immediately before a test starts.
virtual void SetUp() {
// reset to default, because some tests will modify it
S3_ZIP_DECOMPRESS_CHUNKSIZE = S3_ZIP_DEFAULT_CHUNKSIZE;
// need to setup upstreamReader before open.
this->bufReader.setChunkSize(1024 * 1024 * 64);
decompressReader.setReader(&bufReader);
......@@ -68,6 +72,9 @@ class DecompressReaderTest : public testing::Test {
// TearDown() is invoked immediately after a test finishes.
virtual void TearDown() {
decompressReader.close();
// reset to default, because some tests will modify it
S3_ZIP_DECOMPRESS_CHUNKSIZE = S3_ZIP_DEFAULT_CHUNKSIZE;
}
void setBufReaderByRawData(const void *input, int len) {
......@@ -149,10 +156,10 @@ TEST_F(DecompressReaderTest, AbleToDecompressWithSmallReadBuffer) {
// output buffer is smaller than chunk size (16 bytes).
// resize to 32 for 'in' and 'out' buffer
S3_ZIP_CHUNKSIZE = 32;
decompressReader.resizeDecompressReaderBuffer(S3_ZIP_CHUNKSIZE);
S3_ZIP_DECOMPRESS_CHUNKSIZE = 32;
decompressReader.resizeDecompressReaderBuffer(S3_ZIP_DECOMPRESS_CHUNKSIZE);
char hello[S3_ZIP_CHUNKSIZE + 2];
char hello[S3_ZIP_DECOMPRESS_CHUNKSIZE + 2];
memset((void *)hello, 'A', sizeof(hello));
hello[sizeof(hello) - 1] = '\0';
......@@ -174,8 +181,8 @@ TEST_F(DecompressReaderTest, AbleToDecompressWithSmallInternalReaderBuffer) {
// output buffer is smaller than chunk size (9 bytes).
// resize to 32 for 'in' and 'out' buffer
S3_ZIP_CHUNKSIZE = 10;
decompressReader.resizeDecompressReaderBuffer(S3_ZIP_CHUNKSIZE);
S3_ZIP_DECOMPRESS_CHUNKSIZE = 10;
decompressReader.resizeDecompressReaderBuffer(S3_ZIP_DECOMPRESS_CHUNKSIZE);
char hello[34]; // compress 34 'A' will produce 12 compressed bytes.
memset((void *)hello, 'A', sizeof(hello));
......@@ -192,8 +199,8 @@ TEST_F(DecompressReaderTest, AbleToDecompressWithSmallInternalReaderBuffer) {
}
TEST_F(DecompressReaderTest, ReadFromOffsetForEachCall) {
S3_ZIP_CHUNKSIZE = 128;
decompressReader.resizeDecompressReaderBuffer(S3_ZIP_CHUNKSIZE);
S3_ZIP_DECOMPRESS_CHUNKSIZE = 128;
decompressReader.resizeDecompressReaderBuffer(S3_ZIP_DECOMPRESS_CHUNKSIZE);
// Bigger chunk size, smaller read buffer from caller. Need read from offset for each call.
char hello[] = "abcdefghigklmnopqrstuvwxyz";
......@@ -218,10 +225,10 @@ TEST_F(DecompressReaderTest, AbleToDecompressWithAlignedLargeReadBuffer) {
// resize to 8 for 'in' and 'out' buffer
S3_ZIP_CHUNKSIZE = 8;
decompressReader.resizeDecompressReaderBuffer(S3_ZIP_CHUNKSIZE);
S3_ZIP_DECOMPRESS_CHUNKSIZE = 8;
decompressReader.resizeDecompressReaderBuffer(S3_ZIP_DECOMPRESS_CHUNKSIZE);
char hello[S3_ZIP_CHUNKSIZE * 2 + 2];
char hello[S3_ZIP_DECOMPRESS_CHUNKSIZE * 2 + 2];
memset((void *)hello, 'A', sizeof(hello));
hello[sizeof(hello) - 1] = '\0';
......@@ -239,16 +246,16 @@ TEST_F(DecompressReaderTest, AbleToDecompressWithUnalignedLargeReadBuffer) {
// Test case for: optimal buffer size fill after decompression
// We need to make sure that we are filling the decompression
// buffer fully before asking for a new chunck from the read buffer
S3_ZIP_CHUNKSIZE = 8;
decompressReader.resizeDecompressReaderBuffer(S3_ZIP_CHUNKSIZE);
S3_ZIP_DECOMPRESS_CHUNKSIZE = 8;
decompressReader.resizeDecompressReaderBuffer(S3_ZIP_DECOMPRESS_CHUNKSIZE);
char hello[S3_ZIP_CHUNKSIZE * 6 + 2];
char hello[S3_ZIP_DECOMPRESS_CHUNKSIZE * 6 + 2];
memset((void *)hello, 'A', sizeof(hello));
hello[sizeof(hello) - 1] = '\0';
setBufReaderByRawData(hello, sizeof(hello));
char outputBuffer[S3_ZIP_CHUNKSIZE * 6 + 4];
char outputBuffer[S3_ZIP_DECOMPRESS_CHUNKSIZE * 6 + 4];
uint32_t expectedLen[] = {8, 8, 8, 8, 8, 8, 2, 0};
for (uint32_t i = 0; i < sizeof(expectedLen) / sizeof(uint32_t); i++) {
......@@ -257,8 +264,8 @@ TEST_F(DecompressReaderTest, AbleToDecompressWithUnalignedLargeReadBuffer) {
}
TEST_F(DecompressReaderTest, AbleToDecompressWithLargeReadBufferWithDecompressableString) {
S3_ZIP_CHUNKSIZE = 8;
decompressReader.resizeDecompressReaderBuffer(S3_ZIP_CHUNKSIZE);
S3_ZIP_DECOMPRESS_CHUNKSIZE = 8;
decompressReader.resizeDecompressReaderBuffer(S3_ZIP_DECOMPRESS_CHUNKSIZE);
// Smaller chunk size, bigger read buffer from caller. Need composite multiple chunks.
char hello[] = "abcdefghigklmnopqrstuvwxyz"; // 26+1 bytes
......@@ -278,8 +285,8 @@ TEST_F(DecompressReaderTest, AbleToDecompressWithLargeReadBufferWithDecompressab
}
TEST_F(DecompressReaderTest, AbleToDecompressWithSmartLargeReadBufferWithDecompressableString) {
S3_ZIP_CHUNKSIZE = 7;
decompressReader.resizeDecompressReaderBuffer(S3_ZIP_CHUNKSIZE);
S3_ZIP_DECOMPRESS_CHUNKSIZE = 7;
decompressReader.resizeDecompressReaderBuffer(S3_ZIP_DECOMPRESS_CHUNKSIZE);
// Smaller chunk size, bigger read buffer from caller. Need composite multiple chunks.
char hello[] = "abcdefghigklmnopqrstuvwxyz"; // 26+1 bytes
......@@ -300,8 +307,8 @@ TEST_F(DecompressReaderTest, AbleToDecompressWithSmartLargeReadBufferWithDecompr
}
TEST_F(DecompressReaderTest, AbleToDecompressWithIncorrectEncodedStream) {
S3_ZIP_CHUNKSIZE = 128;
decompressReader.resizeDecompressReaderBuffer(S3_ZIP_CHUNKSIZE);
S3_ZIP_DECOMPRESS_CHUNKSIZE = 128;
decompressReader.resizeDecompressReaderBuffer(S3_ZIP_DECOMPRESS_CHUNKSIZE);
// set an incorrect encoding stream to Mock directly.
// it will produce 'Z_DATA_ERROR' when decompressing
......
#include "s3common_writer.cpp"
#include "gtest/gtest.h"
#include "mock_classes.h"
#include "s3macros.h"
#include <map>
#include <vector>
using std::vector;
using ::testing::AtLeast;
using ::testing::Return;
using ::testing::Invoke;
using ::testing::Throw;
using ::testing::_;
class MockS3InterfaceForCompressionWrite : public MockS3Interface {
public:
MockS3InterfaceForCompressionWrite(){};
const uint8_t* getRawData() const {
return this->data.data();
}
size_t getDataSize() const {
return this->data.size();
}
size_t getPartNumber() const {
return this->dataMap.size();
}
bool mockCheckKeyExistence(const string& keyUrl, const string& region,
const S3Credential& cred) {
return false;
}
string mockGetUploadId(const string& keyUrl, const string& region, const S3Credential& cred) {
return this->uploadID;
}
string mockUploadPartOfData(vector<uint8_t>& data, const string& keyUrl, const string& region,
const S3Credential& cred, uint64_t partNumber,
const string& uploadId) {
this->dataMap[partNumber] = data;
return this->uploadID;
}
bool mockCompleteMultiPart(const string& keyUrl, const string& region, const S3Credential& cred,
const string& uploadId, const vector<string>& etagArray) {
map<uint64_t, vector<uint8_t>>::iterator it;
for (it = this->dataMap.begin(); it != this->dataMap.end(); it++) {
this->data.insert(this->data.end(), it->second.begin(), it->second.end());
}
return true;
}
private:
vector<uint8_t> data;
map<uint64_t, vector<uint8_t>> dataMap;
const string uploadID = "I_am_an_uploadID";
};
class S3CommonWriteTest : public ::testing::Test, public S3CommonWriter {
protected:
// Remember that SetUp() is run immediately before a test starts.
virtual void SetUp() {
this->setS3service(&mockS3Interface);
this->out = new Byte[S3_ZIP_DECOMPRESS_CHUNKSIZE];
}
// TearDown() is invoked immediately after a test finishes.
virtual void TearDown() {
this->close();
delete this->out;
}
void simpleUncompress(const char* input, uint64_t len) {
z_stream zstream;
// allocate inflate state for zlib
zstream.zalloc = Z_NULL;
zstream.zfree = Z_NULL;
zstream.opaque = Z_NULL;
int ret = inflateInit2(&zstream, S3_INFLATE_WINDOWSBITS);
CHECK_OR_DIE_MSG(ret == Z_OK, "%s", "failed to initialize zlib library");
zstream.avail_in = len;
zstream.next_in = (Byte*)input;
zstream.next_out = this->out;
zstream.avail_out = S3_ZIP_DECOMPRESS_CHUNKSIZE;
ret = inflate(&zstream, Z_FULL_FLUSH);
if (ret != Z_STREAM_END) {
S3DEBUG("Failed to uncompress sample data");
}
inflateEnd(&zstream);
}
MockS3InterfaceForCompressionWrite mockS3Interface;
Byte* out;
};
TEST_F(S3CommonWriteTest, UsingGZip) {
s3ext_autocompress = true;
EXPECT_CALL(mockS3Interface, getUploadId(_, _, _))
.WillOnce(Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockGetUploadId));
EXPECT_CALL(mockS3Interface, uploadPartOfData(_, _, _, _, _, _))
.WillOnce(
Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockUploadPartOfData));
EXPECT_CALL(mockS3Interface, completeMultiPart(_, _, _, _, _))
.WillOnce(
Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockCompleteMultiPart));
WriterParams params;
params.setNumOfChunks(1);
params.setChunkSize(S3_ZIP_COMPRESS_CHUNKSIZE + 1);
this->open(params);
ASSERT_EQ(this->upstreamWriter, &this->compressWriter);
ASSERT_TRUE(NULL != dynamic_cast<CompressWriter*>(this->upstreamWriter));
}
TEST_F(S3CommonWriteTest, UsingPlain) {
s3ext_autocompress = false;
EXPECT_CALL(mockS3Interface, getUploadId(_, _, _))
.WillOnce(Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockGetUploadId));
WriterParams params;
params.setNumOfChunks(1);
params.setChunkSize(S3_ZIP_COMPRESS_CHUNKSIZE + 1);
this->open(params);
ASSERT_EQ(this->upstreamWriter, &this->keyWriter);
ASSERT_TRUE(NULL != dynamic_cast<S3KeyWriter*>(this->upstreamWriter));
}
TEST_F(S3CommonWriteTest, WritePlainData) {
s3ext_autocompress = false;
// EXPECT_CALL(mockS3Interface, checkKeyExistence(_, _, _))
// .WillOnce(
// Invoke(&mockS3Interface,
// &MockS3InterfaceForCompressionWrite::mockCheckKeyExistence));
EXPECT_CALL(mockS3Interface, getUploadId(_, _, _))
.WillOnce(Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockGetUploadId));
EXPECT_CALL(mockS3Interface, uploadPartOfData(_, _, _, _, _, _))
.WillOnce(
Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockUploadPartOfData));
EXPECT_CALL(mockS3Interface, completeMultiPart(_, _, _, _, _))
.WillOnce(
Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockCompleteMultiPart));
WriterParams params;
params.setNumOfChunks(1);
params.setChunkSize(S3_ZIP_COMPRESS_CHUNKSIZE + 1);
this->open(params);
// 44 bytes
const char input[] = "The quick brown fox jumps over the lazy dog";
this->write(input, sizeof(input));
this->close();
EXPECT_STREQ(input, (const char*)this->mockS3Interface.getRawData());
}
TEST_F(S3CommonWriteTest, WriteGZipData) {
s3ext_autocompress = true;
EXPECT_CALL(mockS3Interface, getUploadId(_, _, _))
.WillOnce(Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockGetUploadId));
EXPECT_CALL(mockS3Interface, uploadPartOfData(_, _, _, _, _, _))
.WillOnce(
Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockUploadPartOfData));
EXPECT_CALL(mockS3Interface, completeMultiPart(_, _, _, _, _))
.WillOnce(
Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockCompleteMultiPart));
WriterParams params;
params.setNumOfChunks(1);
params.setChunkSize(S3_ZIP_COMPRESS_CHUNKSIZE + 1);
this->open(params);
// 44 bytes
const char input[] = "The quick brown fox jumps over the lazy dog";
this->write(input, sizeof(input));
this->close();
this->simpleUncompress((const char*)this->mockS3Interface.getRawData(),
this->mockS3Interface.getDataSize());
EXPECT_STREQ(input, (const char*)this->out);
}
TEST_F(S3CommonWriteTest, WriteGZipDataMultipleTimes) {
s3ext_autocompress = true;
EXPECT_CALL(mockS3Interface, getUploadId(_, _, _))
.WillRepeatedly(
Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockGetUploadId));
EXPECT_CALL(mockS3Interface, uploadPartOfData(_, _, _, _, _, _))
.WillRepeatedly(
Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockUploadPartOfData));
EXPECT_CALL(mockS3Interface, completeMultiPart(_, _, _, _, _))
.WillRepeatedly(
Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockCompleteMultiPart));
WriterParams params;
params.setNumOfChunks(1);
params.setChunkSize(S3_ZIP_COMPRESS_CHUNKSIZE + 1);
this->open(params);
// 44 bytes * 10
const char input[] = "The quick brown fox jumps over the lazy dog";
int times = 10;
for (int i = 0; i < times; i++) {
this->write(input, sizeof(input));
}
this->close();
this->simpleUncompress((const char*)this->mockS3Interface.getRawData(),
this->mockS3Interface.getDataSize());
for (int i = 0; i < times; i++) {
ASSERT_TRUE(memcmp(input, (const char*)this->out + i * sizeof(input), sizeof(input)) == 0);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册