From 5c4e6b5b990da391d591907a9c34e1b1c5a40af2 Mon Sep 17 00:00:00 2001 From: Kuien Liu Date: Tue, 23 Aug 2016 15:55:51 +0800 Subject: [PATCH] s3ext: support data compression when writing data to S3 If 'autocompress' in s3 configure file is set to 'true', all data will be compressed before uploaded to S3. In this way, we can reduce the network traffic significantly, which means money-saving as well. The data will be compressed in 'NO_FLUSH' way before injected into underlayer S3KeyWriter's buffer, and the latter will invoke RESTFul layer to finish data uploading. We don't buffer data issued from s3extprotocol in compression layer, that is, all data blocks will be injected into ZStream to deflate immediately. It is because experimental results show little performance improvement by doing this while consuming more memory. Signed-off-by: Peifeng Qiu, Adam Lee --- gpAux/extensions/gps3ext/.gitignore | 9 + .../extensions/gps3ext/bin/dummyHTTPServer.py | 2 +- .../gps3ext/include/compress_writer.h | 40 +++ .../gps3ext/include/decompress_reader.h | 10 +- gpAux/extensions/gps3ext/include/gpreader.h | 1 - gpAux/extensions/gps3ext/include/gpwriter.h | 9 +- gpAux/extensions/gps3ext/include/makefile.inc | 2 +- .../gps3ext/include/restful_service.h | 4 +- .../gps3ext/include/s3common_reader.h | 1 + .../gps3ext/include/s3common_writer.h | 37 +++ gpAux/extensions/gps3ext/include/s3conf.h | 3 + .../extensions/gps3ext/include/s3key_writer.h | 4 +- gpAux/extensions/gps3ext/include/s3macros.h | 21 +- gpAux/extensions/gps3ext/include/writer.h | 2 +- .../gps3ext/src/compress_writer.cpp | 97 +++++++ .../gps3ext/src/decompress_reader.cpp | 44 ++-- gpAux/extensions/gps3ext/src/gpwriter.cpp | 13 +- .../gps3ext/src/s3bucket_reader.cpp | 2 - .../gps3ext/src/s3common_writer.cpp | 23 ++ gpAux/extensions/gps3ext/src/s3conf.cpp | 4 + gpAux/extensions/gps3ext/src/s3interface.cpp | 1 - gpAux/extensions/gps3ext/src/s3key_writer.cpp | 18 +- .../gps3ext/src/s3restful_service.cpp | 5 +- .../gps3ext/test/compress_writer_test.cpp | 162 ++++++++++++ .../gps3ext/test/decompress_reader_test.cpp | 47 ++-- .../gps3ext/test/s3common_writer_test.cpp | 237 ++++++++++++++++++ 26 files changed, 709 insertions(+), 89 deletions(-) create mode 100644 gpAux/extensions/gps3ext/include/compress_writer.h create mode 100644 gpAux/extensions/gps3ext/include/s3common_writer.h create mode 100644 gpAux/extensions/gps3ext/src/compress_writer.cpp create mode 100644 gpAux/extensions/gps3ext/src/s3common_writer.cpp create mode 100644 gpAux/extensions/gps3ext/test/compress_writer_test.cpp create mode 100644 gpAux/extensions/gps3ext/test/s3common_writer_test.cpp diff --git a/gpAux/extensions/gps3ext/.gitignore b/gpAux/extensions/gps3ext/.gitignore index 8f2153d43a..1fa398c8e0 100644 --- a/gpAux/extensions/gps3ext/.gitignore +++ b/gpAux/extensions/gps3ext/.gitignore @@ -63,4 +63,13 @@ source_replaced *.diffs .idea + +*.sln +*.suo +*.vcxproj +*.vcxproj.user +*.filters +*.VC.db +*.VC.opendb + CMakeLists.txt diff --git a/gpAux/extensions/gps3ext/bin/dummyHTTPServer.py b/gpAux/extensions/gps3ext/bin/dummyHTTPServer.py index 168cab5745..0cfeafbc4d 100755 --- a/gpAux/extensions/gps3ext/bin/dummyHTTPServer.py +++ b/gpAux/extensions/gps3ext/bin/dummyHTTPServer.py @@ -50,7 +50,7 @@ class S(BaseHTTPRequestHandler): def do_DELETE(self): # Just bounce the request back - print "----- SOMETHING WAS DELETE ------" + print "----- SOMETHING WAS DELETED ------" print self.headers length = int(self.headers['Content-Length']) # content = self.rfile.read(length) diff --git a/gpAux/extensions/gps3ext/include/compress_writer.h b/gpAux/extensions/gps3ext/include/compress_writer.h new file mode 100644 index 0000000000..fb1cdfb939 --- /dev/null +++ b/gpAux/extensions/gps3ext/include/compress_writer.h @@ -0,0 +1,40 @@ +#ifndef INCLUDE_COMPRESS_WRITER_H_ +#define INCLUDE_COMPRESS_WRITER_H_ + +#include +#include "writer.h" + +// 2MB by default +extern uint64_t S3_ZIP_COMPRESS_CHUNKSIZE; + +class CompressWriter : public Writer { + public: + CompressWriter(); + virtual ~CompressWriter(); + + virtual void open(const WriterParams ¶ms); + + // write() attempts to write up to count bytes from the buffer. + // Always return 0 if EOF, no matter how many times it's invoked. Throw exception if encounters + // errors. + virtual uint64_t write(const char *buf, uint64_t count); + + // This should be reentrant, has no side effects when called multiple times. + virtual void close(); + + void setWriter(Writer *writer); + + private: + void flush(); + + Writer *writer; + + // zlib related variables. + z_stream zstream; + char *out; // Output buffer for compression. + + // add this flag to make close() reentrant + bool isClosed; +}; + +#endif \ No newline at end of file diff --git a/gpAux/extensions/gps3ext/include/decompress_reader.h b/gpAux/extensions/gps3ext/include/decompress_reader.h index c9da9243ca..781323c939 100644 --- a/gpAux/extensions/gps3ext/include/decompress_reader.h +++ b/gpAux/extensions/gps3ext/include/decompress_reader.h @@ -5,21 +5,21 @@ #include "reader.h" // 2MB by default -extern uint64_t S3_ZIP_CHUNKSIZE; +extern uint64_t S3_ZIP_DECOMPRESS_CHUNKSIZE; class DecompressReader : public Reader { public: DecompressReader(); virtual ~DecompressReader(); - void open(const ReaderParams ¶ms); + virtual void open(const ReaderParams ¶ms); // read() attempts to read up to count bytes into the buffer. // Return 0 if EOF. Throw exception if encounters errors. - uint64_t read(char *buf, uint64_t count); + virtual uint64_t read(char *buf, uint64_t count); // This should be reentrant, has no side effects when called multiple times. - void close(); + virtual void close(); void setReader(Reader *reader); @@ -29,7 +29,7 @@ class DecompressReader : public Reader { void decompress(); uint64_t getDecompressedBytesNum() { - return S3_ZIP_CHUNKSIZE - this->zstream.avail_out; + return S3_ZIP_DECOMPRESS_CHUNKSIZE - this->zstream.avail_out; } Reader *reader; diff --git a/gpAux/extensions/gps3ext/include/gpreader.h b/gpAux/extensions/gps3ext/include/gpreader.h index d9b65ba401..cc2fd3a43f 100644 --- a/gpAux/extensions/gps3ext/include/gpreader.h +++ b/gpAux/extensions/gps3ext/include/gpreader.h @@ -36,7 +36,6 @@ class GPReader : public Reader { protected: S3BucketReader bucketReader; S3CommonReader commonReader; - DecompressReader uncomressReader; S3RESTfulService restfulService; S3Service s3service; diff --git a/gpAux/extensions/gps3ext/include/gpwriter.h b/gpAux/extensions/gps3ext/include/gpwriter.h index 0b0a8f08fd..238fcf1bc4 100644 --- a/gpAux/extensions/gps3ext/include/gpwriter.h +++ b/gpAux/extensions/gps3ext/include/gpwriter.h @@ -4,7 +4,7 @@ #include #include -#include "s3key_writer.h" +#include "s3common_writer.h" #include "writer.h" #define S3_DEFAULT_FORMAT "data" @@ -20,7 +20,7 @@ class GPWriter : public Writer { // write() attempts to write up to count bytes from the buffer. // Always return 0 if EOF, no matter how many times it's invoked. Throw exception if encounters // errors. - virtual uint64_t write(char *buf, uint64_t count); + virtual uint64_t write(const char *buf, uint64_t count); // This should be reentrant, has no side effects when called multiple times. virtual void close(); @@ -39,13 +39,12 @@ class GPWriter : public Writer { string genUniqueKeyName(const string &url); protected: + string format; WriterParams params; S3RESTfulService restfulService; S3Service s3service; S3Credential cred; - - S3KeyWriter keyWriter; - string format; + S3CommonWriter commonWriter; // it links to itself by default // but the pointer here leaves a chance to mock it in unit test diff --git a/gpAux/extensions/gps3ext/include/makefile.inc b/gpAux/extensions/gps3ext/include/makefile.inc index a40cb38ca1..782a044c64 100644 --- a/gpAux/extensions/gps3ext/include/makefile.inc +++ b/gpAux/extensions/gps3ext/include/makefile.inc @@ -1,4 +1,4 @@ -COMMON_OBJS = gpreader.o gpwriter.o s3conf.o s3common.o s3utils.o s3log.o s3url.o s3http_headers.o s3interface.o s3restful_service.o decompress_reader.o s3key_reader.o s3key_writer.o s3bucket_reader.o s3common_reader.o +COMMON_OBJS = gpreader.o gpwriter.o s3conf.o s3common.o s3utils.o s3log.o s3url.o s3http_headers.o s3interface.o s3restful_service.o decompress_reader.o s3key_reader.o compress_writer.o s3common_writer.o s3key_writer.o s3bucket_reader.o s3common_reader.o COMMON_LINK_OPTIONS = -lstdc++ -lxml2 -lpthread -lcrypto -lcurl -lz diff --git a/gpAux/extensions/gps3ext/include/restful_service.h b/gpAux/extensions/gps3ext/include/restful_service.h index 689387b321..ffe3aff967 100644 --- a/gpAux/extensions/gps3ext/include/restful_service.h +++ b/gpAux/extensions/gps3ext/include/restful_service.h @@ -17,14 +17,14 @@ enum ResponseStatus { RESPONSE_OK, // everything is OK RESPONSE_FAIL, // curl failed (i.e., the status is not CURLE_OK) RESPONSE_ERROR, // server error (server return code is not 200) - RESPONSE_ABORT, // the query has been abort by user + RESPONSE_ABORT, // the query has been aborted by user }; typedef long ResponseCode; #define HeadResponseFail -1 // 2XX are successful response. -// Here we deal with 200 (OK), 204 (no content) and 206 (partial content) currently. +// Here we deal with 200 (OK), 204 (no content) and 206 (partial content) currently, // 204 is for DELETE request. // We may move this function to RESTfulService() in future inline bool isSuccessfulResponse(ResponseCode code) { diff --git a/gpAux/extensions/gps3ext/include/s3common_reader.h b/gpAux/extensions/gps3ext/include/s3common_reader.h index ee916bdfec..952051215e 100644 --- a/gpAux/extensions/gps3ext/include/s3common_reader.h +++ b/gpAux/extensions/gps3ext/include/s3common_reader.h @@ -21,6 +21,7 @@ class S3CommonReader : public Reader { // This should be reentrant, has no side effects when called multiple times. virtual void close(); + // Used by Mock, DO NOT call it in other places. void setS3service(S3Interface* s3service) { this->s3service = s3service; } diff --git a/gpAux/extensions/gps3ext/include/s3common_writer.h b/gpAux/extensions/gps3ext/include/s3common_writer.h new file mode 100644 index 0000000000..0d7089d3db --- /dev/null +++ b/gpAux/extensions/gps3ext/include/s3common_writer.h @@ -0,0 +1,37 @@ +#ifndef INCLUDE_S3COMMON_WRITER_H_ +#define INCLUDE_S3COMMON_WRITER_H_ + +#include "compress_writer.h" +#include "s3key_writer.h" + +class S3CommonWriter : public Writer { + public: + S3CommonWriter() : upstreamWriter(NULL), s3service(NULL) { + } + + virtual ~S3CommonWriter() { + } + + virtual void open(const WriterParams& params); + + // write() attempts to write up to count bytes from the buffer. + // Always return 0 if EOF, no matter how many times it's invoked. Throw exception if encounters + // errors. + virtual uint64_t write(const char* buf, uint64_t count); + + // This should be reentrant, has no side effects when called multiple times. + virtual void close(); + + // Used by Mock, DO NOT call it in other places. + void setS3service(S3Interface* s3service) { + this->s3service = s3service; + } + + protected: + Writer* upstreamWriter; + S3Interface* s3service; + S3KeyWriter keyWriter; + CompressWriter compressWriter; +}; + +#endif \ No newline at end of file diff --git a/gpAux/extensions/gps3ext/include/s3conf.h b/gpAux/extensions/gps3ext/include/s3conf.h index d9ccc23363..987b1ce8c0 100644 --- a/gpAux/extensions/gps3ext/include/s3conf.h +++ b/gpAux/extensions/gps3ext/include/s3conf.h @@ -50,5 +50,8 @@ extern struct sockaddr_in s3ext_logserveraddr; extern int32_t s3ext_low_speed_limit; extern int32_t s3ext_low_speed_time; +// whether to compress data before uploading +extern bool s3ext_autocompress; + // not thread safe!! call it only once. bool InitConfig(const string &path, const string section); diff --git a/gpAux/extensions/gps3ext/include/s3key_writer.h b/gpAux/extensions/gps3ext/include/s3key_writer.h index f16757b5e2..0d12444f16 100644 --- a/gpAux/extensions/gps3ext/include/s3key_writer.h +++ b/gpAux/extensions/gps3ext/include/s3key_writer.h @@ -28,7 +28,7 @@ class S3KeyWriter : public Writer { // write() attempts to write up to count bytes from the buffer. // Always return 0 if EOF, no matter how many times it's invoked. Throw exception if encounters // errors. - virtual uint64_t write(char* buf, uint64_t count); + virtual uint64_t write(const char* buf, uint64_t count); // This should be reentrant, has no side effects when called multiple times. virtual void close(); @@ -40,7 +40,7 @@ class S3KeyWriter : public Writer { protected: void flushBuffer(); void completeKeyWriting(); - void checkQueryCancel(); + void checkQueryCancelSignal(); WriterBuffer buffer; S3Interface* s3interface; diff --git a/gpAux/extensions/gps3ext/include/s3macros.h b/gpAux/extensions/gps3ext/include/s3macros.h index 448a793731..e937a08e90 100644 --- a/gpAux/extensions/gps3ext/include/s3macros.h +++ b/gpAux/extensions/gps3ext/include/s3macros.h @@ -9,7 +9,7 @@ using std::string; using std::stringstream; -#define BUFFER_LEN 1024 +#define PRINTOUT_BUFFER_LEN 1024 extern void StringAppendPrintf(std::string *output, const char *format, ...); @@ -39,8 +39,8 @@ extern void StringAppendPrintf(std::string *output, const char *format, ...); inline void VStringAppendPrintf(string *output, const char *format, va_list argp) { CHECK_OR_DIE(output); - char buffer[BUFFER_LEN]; - vsnprintf(buffer, BUFFER_LEN, format, argp); + char buffer[PRINTOUT_BUFFER_LEN]; + vsnprintf(buffer, sizeof(buffer), format, argp); output->append(buffer); } @@ -51,4 +51,19 @@ inline void StringAppendPrintf(string *output, const char *format, ...) { va_end(argp); } +// chunk size for compression/decompression +// declare them here so UT tests can access each as well +extern uint64_t S3_ZIP_DECOMPRESS_CHUNKSIZE; +extern uint64_t S3_ZIP_COMPRESS_CHUNKSIZE; + +#define S3_ZIP_DEFAULT_CHUNKSIZE (1024 * 1024 * 2) + +// For deflate, windowBits can be greater than 15 for optional gzip encoding. Add 16 to windowBits +// to write a simple gzip header and trailer around the compressed data instead of a zlib wrapper. +#define S3_DEFLATE_WINDOWSBITS (MAX_WBITS + 16) + +// For inflate, windowBits can be greater than 15 for optional gzip decoding. Add 32 to windowBits +// to enable zlib and gzip decoding with automatic header detection. +#define S3_INFLATE_WINDOWSBITS (MAX_WBITS + 16 + 16) + #endif diff --git a/gpAux/extensions/gps3ext/include/writer.h b/gpAux/extensions/gps3ext/include/writer.h index eb754bfd9d..c221cfb854 100644 --- a/gpAux/extensions/gps3ext/include/writer.h +++ b/gpAux/extensions/gps3ext/include/writer.h @@ -13,7 +13,7 @@ class Writer { // write() attempts to write up to count bytes from the buffer. // Always return 0 if EOF, no matter how many times it's invoked. Throw exception if encounters // errors. - virtual uint64_t write(char *buf, uint64_t count) = 0; + virtual uint64_t write(const char *buf, uint64_t count) = 0; // This should be reentrant, has no side effects when called multiple times. virtual void close() = 0; diff --git a/gpAux/extensions/gps3ext/src/compress_writer.cpp b/gpAux/extensions/gps3ext/src/compress_writer.cpp new file mode 100644 index 0000000000..d573b57b7d --- /dev/null +++ b/gpAux/extensions/gps3ext/src/compress_writer.cpp @@ -0,0 +1,97 @@ +#define __STDC_FORMAT_MACROS +#include +#include + +#include "compress_writer.h" +#include "s3log.h" +#include "s3macros.h" + +uint64_t S3_ZIP_COMPRESS_CHUNKSIZE = S3_ZIP_DEFAULT_CHUNKSIZE; + +CompressWriter::CompressWriter() : writer(NULL), isClosed(false) { + this->out = new char[S3_ZIP_COMPRESS_CHUNKSIZE]; +} + +CompressWriter::~CompressWriter() { + delete this->out; +} + +void CompressWriter::open(const WriterParams& params) { + this->zstream.zalloc = Z_NULL; + this->zstream.zfree = Z_NULL; + this->zstream.opaque = Z_NULL; + + // With S3_DEFLATE_WINDOWSBITS, it generates gzip stream with header and trailer + int ret = deflateInit2(&this->zstream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, + S3_DEFLATE_WINDOWSBITS, 8, Z_DEFAULT_STRATEGY); + + // init them here to get ready for both writer() and close() + this->zstream.next_in = NULL; + this->zstream.avail_in = 0; + this->zstream.next_out = (Byte*)this->out; + this->zstream.avail_out = S3_ZIP_COMPRESS_CHUNKSIZE; + + CHECK_OR_DIE_MSG(ret == Z_OK, "Failed to initialize zlib library: %s", this->zstream.msg); + + this->writer->open(params); + this->isClosed = false; +} + +uint64_t CompressWriter::write(const char* buf, uint64_t count) { + if (buf == NULL || count == 0) { + return 0; + } + + // we assume data block from GPDB is always smaller than gzip chunkbuffer + CHECK_OR_DIE_MSG(count < S3_ZIP_COMPRESS_CHUNKSIZE, + "Data size " PRIu64 " is larger than S3_ZIP_COMPRESS_CHUNKSIZE", count); + + this->zstream.next_in = (Byte*)buf; + this->zstream.avail_in = count; + + int status = deflate(&this->zstream, Z_NO_FLUSH); + if (status < 0 && status != Z_BUF_ERROR) { + deflateEnd(&this->zstream); + CHECK_OR_DIE_MSG(false, "Failed to compress data: %d, %s", status, this->zstream.msg); + } + + this->flush(); + + return count; +} + +void CompressWriter::close() { + if (this->isClosed) { + return; + } + + int status; + do { + status = deflate(&this->zstream, Z_FINISH); + this->flush(); + } while (status == Z_OK); + + deflateEnd(&this->zstream); + + if (status != Z_STREAM_END) { + CHECK_OR_DIE_MSG(false, "Failed to finish data compression: %d, %s", status, + this->zstream.msg); + } + + S3DEBUG("Compression finished: Z_STREAM_END."); + + this->writer->close(); + this->isClosed = true; +} + +void CompressWriter::setWriter(Writer* writer) { + this->writer = writer; +} + +void CompressWriter::flush() { + if (this->zstream.avail_out < S3_ZIP_COMPRESS_CHUNKSIZE) { + this->writer->write(this->out, S3_ZIP_COMPRESS_CHUNKSIZE - this->zstream.avail_out); + this->zstream.next_out = (Byte*)this->out; + this->zstream.avail_out = S3_ZIP_COMPRESS_CHUNKSIZE; + } +} \ No newline at end of file diff --git a/gpAux/extensions/gps3ext/src/decompress_reader.cpp b/gpAux/extensions/gps3ext/src/decompress_reader.cpp index e83de386dc..c38692186f 100644 --- a/gpAux/extensions/gps3ext/src/decompress_reader.cpp +++ b/gpAux/extensions/gps3ext/src/decompress_reader.cpp @@ -8,12 +8,12 @@ #include "s3log.h" #include "s3macros.h" -uint64_t S3_ZIP_CHUNKSIZE = 1024 * 1024 * 2; +uint64_t S3_ZIP_DECOMPRESS_CHUNKSIZE = S3_ZIP_DEFAULT_CHUNKSIZE; DecompressReader::DecompressReader() { this->reader = NULL; - this->in = new char[S3_ZIP_CHUNKSIZE]; - this->out = new char[S3_ZIP_CHUNKSIZE]; + this->in = new char[S3_ZIP_DECOMPRESS_CHUNKSIZE]; + this->out = new char[S3_ZIP_DECOMPRESS_CHUNKSIZE]; this->outOffset = 0; } @@ -45,12 +45,12 @@ void DecompressReader::open(const ReaderParams ¶ms) { zstream.next_out = (Byte *)this->out; zstream.avail_in = 0; - zstream.avail_out = S3_ZIP_CHUNKSIZE; + zstream.avail_out = S3_ZIP_DECOMPRESS_CHUNKSIZE; this->outOffset = 0; - // 47 is the number of windows bits, to make sure zlib could recognize and decode gzip stream. - int ret = inflateInit2(&zstream, 47); + // with S3_INFLATE_WINDOWSBITS, it could recognize and decode both zlib and gzip stream. + int ret = inflateInit2(&zstream, S3_INFLATE_WINDOWSBITS); CHECK_OR_DIE_MSG(ret == Z_OK, "%s", "failed to initialize zlib library"); this->reader->open(params); @@ -59,9 +59,6 @@ void DecompressReader::open(const ReaderParams ¶ms) { uint64_t DecompressReader::read(char *buf, uint64_t bufSize) { uint64_t remainingOutLen = this->getDecompressedBytesNum() - this->outOffset; - // S3DEBUG("has = %" PRIu64 ", offset = %d, chunksize = %u, avail_out = %u, count = %" - // PRIu64, - // remainingOutLen, outOffset, S3_ZIP_CHUNKSIZE, this->zstream.avail_out, bufSize); if (remainingOutLen == 0) { this->decompress(); this->outOffset = 0; // reset cursor for out buffer to read from beginning. @@ -77,16 +74,16 @@ uint64_t DecompressReader::read(char *buf, uint64_t bufSize) { } // Read compressed data from underlying reader and decompress to this->out buffer. -// If no more data to consume, this->zstream.avail_out == S3_ZIP_CHUNKSIZE; +// If no more data to consume, this->zstream.avail_out == S3_ZIP_DECOMPRESS_CHUNKSIZE; void DecompressReader::decompress() { if (this->zstream.avail_in == 0) { - this->zstream.avail_out = S3_ZIP_CHUNKSIZE; + this->zstream.avail_out = S3_ZIP_DECOMPRESS_CHUNKSIZE; this->zstream.next_out = (Byte *)this->out; - // reader S3_ZIP_CHUNKSIZE data from underlying reader and put into this->in buffer. - // read() might happen more than one time when it's EOF, make sure every time read() will - // return 0. - uint64_t hasRead = this->reader->read(this->in, S3_ZIP_CHUNKSIZE); + // read S3_ZIP_DECOMPRESS_CHUNKSIZE data from underlying reader and put into this->in + // buffer. read() might happen more than once when reaching EOF, make sure every time read() + // will return 0. + uint64_t hasRead = this->reader->read(this->in, S3_ZIP_DECOMPRESS_CHUNKSIZE); // EOF, no more data to decompress. if (hasRead == 0) { @@ -99,8 +96,9 @@ void DecompressReader::decompress() { // Fill this->in as possible as it could, otherwise data in this->in might not be able to be // inflated. - while (hasRead < S3_ZIP_CHUNKSIZE) { - uint64_t count = this->reader->read(this->in + hasRead, S3_ZIP_CHUNKSIZE - hasRead); + while (hasRead < S3_ZIP_DECOMPRESS_CHUNKSIZE) { + uint64_t count = + this->reader->read(this->in + hasRead, S3_ZIP_DECOMPRESS_CHUNKSIZE - hasRead); if (count == 0) { break; @@ -113,25 +111,17 @@ void DecompressReader::decompress() { this->zstream.avail_in = hasRead; } else { // Still have more data in 'in' buffer to decode. - this->zstream.avail_out = S3_ZIP_CHUNKSIZE; + this->zstream.avail_out = S3_ZIP_DECOMPRESS_CHUNKSIZE; this->zstream.next_out = (Byte *)this->out; } - // S3DEBUG("Before decompress: avail_in = %u, avail_out = %u, total_in = %u, total_out = %u", - // zstream.avail_in, zstream.avail_out, zstream.total_in, zstream.total_out); - int status = inflate(&this->zstream, Z_NO_FLUSH); if (status == Z_STREAM_END) { - S3DEBUG("Compression finished: Z_STREAM_END."); + S3DEBUG("Decompression finished: Z_STREAM_END."); } else if (status < 0 || status == Z_NEED_DICT) { inflateEnd(&this->zstream); CHECK_OR_DIE_MSG(false, "Failed to decompress data: %d", status); } - - // S3DEBUG("After decompress: avail_in = %u, avail_out = %u, total_in = %u, total_out = %u", - // zstream.avail_in, zstream.avail_out, zstream.total_in, zstream.total_out); - - return; } void DecompressReader::close() { diff --git a/gpAux/extensions/gps3ext/src/gpwriter.cpp b/gpAux/extensions/gps3ext/src/gpwriter.cpp index ab805e10cc..66f642334b 100644 --- a/gpAux/extensions/gps3ext/src/gpwriter.cpp +++ b/gpAux/extensions/gps3ext/src/gpwriter.cpp @@ -47,16 +47,16 @@ void GPWriter::constructWriterParams(const string& url) { void GPWriter::open(const WriterParams& params) { this->s3service.setRESTfulService(this->restfulServicePtr); this->params.setKeyUrl(this->genUniqueKeyName(this->params.getBaseUrl())); - this->keyWriter.setS3interface(&this->s3service); - this->keyWriter.open(this->params); + this->commonWriter.setS3service(&this->s3service); + this->commonWriter.open(this->params); } -uint64_t GPWriter::write(char* buf, uint64_t count) { - return this->keyWriter.write(buf, count); +uint64_t GPWriter::write(const char* buf, uint64_t count) { + return this->commonWriter.write(buf, count); } void GPWriter::close() { - this->keyWriter.close(); + this->commonWriter.close(); } string GPWriter::genUniqueKeyName(const string& url) { @@ -127,7 +127,8 @@ GPWriter* writer_init(const char* url_with_options, const char* format) { InitRemoteLog(); - writer = new GPWriter(url, format); + string extName = s3ext_autocompress ? string(format) + ".gz" : format; + writer = new GPWriter(url, extName); if (writer == NULL) { return NULL; } diff --git a/gpAux/extensions/gps3ext/src/s3bucket_reader.cpp b/gpAux/extensions/gps3ext/src/s3bucket_reader.cpp index 2c85cfdf00..bddc46ce15 100644 --- a/gpAux/extensions/gps3ext/src/s3bucket_reader.cpp +++ b/gpAux/extensions/gps3ext/src/s3bucket_reader.cpp @@ -114,8 +114,6 @@ void S3BucketReader::close() { if (!this->keyList.contents.empty()) { this->keyList.contents.clear(); } - - return; } string S3BucketReader::getKeyURL(const string &key) { diff --git a/gpAux/extensions/gps3ext/src/s3common_writer.cpp b/gpAux/extensions/gps3ext/src/s3common_writer.cpp new file mode 100644 index 0000000000..3612423ce7 --- /dev/null +++ b/gpAux/extensions/gps3ext/src/s3common_writer.cpp @@ -0,0 +1,23 @@ +#include "s3common_writer.h" +#include "s3macros.h" + +void S3CommonWriter::open(const WriterParams& params) { + this->keyWriter.setS3interface(this->s3service); + + if (s3ext_autocompress) { + this->upstreamWriter = &this->compressWriter; + this->compressWriter.setWriter(&this->keyWriter); + } else { + this->upstreamWriter = &this->keyWriter; + } + + this->upstreamWriter->open(params); +} + +uint64_t S3CommonWriter::write(const char* buf, uint64_t count) { + return this->upstreamWriter->write(buf, count); +} + +void S3CommonWriter::close() { + this->upstreamWriter->close(); +} \ No newline at end of file diff --git a/gpAux/extensions/gps3ext/src/s3conf.cpp b/gpAux/extensions/gps3ext/src/s3conf.cpp index 95f1f2f0ee..3f3d6bc8a4 100644 --- a/gpAux/extensions/gps3ext/src/s3conf.cpp +++ b/gpAux/extensions/gps3ext/src/s3conf.cpp @@ -44,6 +44,7 @@ string s3ext_token; bool s3ext_encryption = true; bool s3ext_debug_curl = false; +bool s3ext_autocompress = true; int32_t s3ext_segid = -1; int32_t s3ext_segnum = -1; @@ -140,6 +141,9 @@ bool InitConfig(const string& conf_path, const string section = "default") { content = s3cfg->Get(section.c_str(), "encryption", "true"); s3ext_encryption = to_bool(content); + content = s3cfg->Get(section.c_str(), "autocompress", "true"); + s3ext_autocompress = to_bool(content); + #ifdef S3_STANDALONE s3ext_segid = 0; s3ext_segnum = 1; diff --git a/gpAux/extensions/gps3ext/src/s3interface.cpp b/gpAux/extensions/gps3ext/src/s3interface.cpp index b79f063648..e71c9c37f1 100644 --- a/gpAux/extensions/gps3ext/src/s3interface.cpp +++ b/gpAux/extensions/gps3ext/src/s3interface.cpp @@ -492,7 +492,6 @@ string S3Service::uploadPartOfData(vector &data, const string &keyUrl, urlWithQuery << keyUrl << "?partNumber=" << partNumber << "&uploadId=" << uploadId; Response resp = this->putResponseWithRetries(urlWithQuery.str(), headers, params, data); - if (resp.getStatus() == RESPONSE_OK) { string headers(resp.getRawHeaders().begin(), resp.getRawHeaders().end()); diff --git a/gpAux/extensions/gps3ext/src/s3key_writer.cpp b/gpAux/extensions/gps3ext/src/s3key_writer.cpp index 393cb36018..8dfdd25e5f 100644 --- a/gpAux/extensions/gps3ext/src/s3key_writer.cpp +++ b/gpAux/extensions/gps3ext/src/s3key_writer.cpp @@ -21,9 +21,9 @@ void S3KeyWriter::open(const WriterParams ¶ms) { // write() attempts to write up to count bytes from the buffer. // Always return 0 if EOF, no matter how many times it's invoked. Throw exception if encounters // errors. -uint64_t S3KeyWriter::write(char *buf, uint64_t count) { +uint64_t S3KeyWriter::write(const char *buf, uint64_t count) { CHECK_OR_DIE(buf != NULL); - this->checkQueryCancel(); + this->checkQueryCancelSignal(); // GPDB issues 64K- block every time and chunkSize is 8MB+ if (count > this->chunkSize) { @@ -42,14 +42,14 @@ uint64_t S3KeyWriter::write(char *buf, uint64_t count) { // This should be reentrant, has no side effects when called multiple times. void S3KeyWriter::close() { - this->checkQueryCancel(); + this->checkQueryCancelSignal(); if (!this->uploadId.empty()) { this->completeKeyWriting(); } } -void S3KeyWriter::checkQueryCancel() { +void S3KeyWriter::checkQueryCancelSignal() { if (QueryCancelPending && !this->uploadId.empty()) { this->s3interface->abortUpload(this->url, this->region, this->cred, this->uploadId); this->etagList.clear(); @@ -67,11 +67,11 @@ void S3KeyWriter::flushBuffer() { this->buffer.clear(); - // most time query is canceled during uploadPartOfData, - // this is the first chance to cancel and clean up upload. - // Otherwise GPDB will call with LAST_CALL but QueryCancelPending is set to false. - // and we can't detect query cancel in S3KeyWriter::close. - this->checkQueryCancel(); + // Most time query is canceled during uploadPartOfData, + // This is the first chance to cancel and clean up upload. + // Otherwise GPDB will call with LAST_CALL but QueryCancelPending is set to false, + // and we can't detect query cancel signal in S3KeyWriter::close(). + this->checkQueryCancelSignal(); } } diff --git a/gpAux/extensions/gps3ext/src/s3restful_service.cpp b/gpAux/extensions/gps3ext/src/s3restful_service.cpp index 2bb96b4ec3..ea67ddc923 100644 --- a/gpAux/extensions/gps3ext/src/s3restful_service.cpp +++ b/gpAux/extensions/gps3ext/src/s3restful_service.cpp @@ -39,7 +39,7 @@ size_t RESTfulServiceWriteFuncCallback(char *ptr, size_t size, size_t nmemb, voi return realsize; } -// curl's write function callback, used only by DELETE request when query is canceled. +// cURL's write function callback, only used by DELETE request when query is canceled. // It shouldn't be interrupted by QueryCancelPending. size_t RESTfulServiceAbortFuncCallback(char *ptr, size_t size, size_t nmemb, void *userp) { size_t realsize = size * nmemb; @@ -409,8 +409,7 @@ Response S3RESTfulService::deleteRequest(const string &url, HTTPHeaders &headers curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &responseCode); // 2XX are successful response. Here we deal with 200 (OK) 204 (no content), and 206 - // (partial content) - // firstly. + // (partial content) firstly. if (isSuccessfulResponse(responseCode)) { response.setStatus(RESPONSE_OK); response.setMessage("Success"); diff --git a/gpAux/extensions/gps3ext/test/compress_writer_test.cpp b/gpAux/extensions/gps3ext/test/compress_writer_test.cpp new file mode 100644 index 0000000000..07f6804fa5 --- /dev/null +++ b/gpAux/extensions/gps3ext/test/compress_writer_test.cpp @@ -0,0 +1,162 @@ +#include + +#include "compress_writer.cpp" +#include "gtest/gtest.h" +#include "s3macros.h" + +using std::vector; + +class MockWriter : public Writer { + public: + MockWriter() { + } + virtual ~MockWriter() { + } + + virtual void open(const WriterParams ¶ms) { + } + + virtual uint64_t write(const char *buf, uint64_t count) { + this->data.insert(this->data.end(), buf, buf + count); + return count; + } + + virtual void close() { + // this->data.clear(); + } + + const char *getRawData() const { + return this->data.data(); + } + + vector &getRawDataVector() { + return this->data; + } + + size_t getDataSize() const { + return this->data.size(); + } + + private: + vector data; +}; + +class CompressWriterTest : public testing::Test { + protected: + // Remember that SetUp() is run immediately before a test starts. + virtual void SetUp() { + compressWriter.setWriter(&writer); + compressWriter.open(params); + + this->out = new Byte[S3_ZIP_DECOMPRESS_CHUNKSIZE]; + } + + // TearDown() is invoked immediately after a test finishes. + virtual void TearDown() { + compressWriter.close(); + + delete this->out; + } + + void simpleUncompress(const char *input, uint64_t len) { + z_stream zstream; + + // allocate inflate state for zlib + zstream.zalloc = Z_NULL; + zstream.zfree = Z_NULL; + zstream.opaque = Z_NULL; + + int ret = inflateInit2(&zstream, S3_INFLATE_WINDOWSBITS); + CHECK_OR_DIE_MSG(ret == Z_OK, "%s", "failed to initialize zlib library"); + + zstream.avail_in = len; + zstream.next_in = (Byte *)input; + zstream.next_out = this->out; + zstream.avail_out = S3_ZIP_DECOMPRESS_CHUNKSIZE; + + ret = inflate(&zstream, Z_FULL_FLUSH); + + if (ret != Z_STREAM_END) { + S3DEBUG("Failed to uncompress sample data"); + } + + inflateEnd(&zstream); + } + + WriterParams params; + CompressWriter compressWriter; + MockWriter writer; + + Byte *out; +}; + +TEST_F(CompressWriterTest, AbleToInputNull) { + compressWriter.write(NULL, 0); + EXPECT_EQ(0, writer.getDataSize()); +} + +TEST_F(CompressWriterTest, AbleToCompressEmptyData) { + char input[10] = {0}; + compressWriter.write(input, sizeof(input)); + compressWriter.close(); + + this->simpleUncompress(writer.getRawData(), writer.getDataSize()); + EXPECT_STREQ(input, (const char *)this->out); +} + +TEST_F(CompressWriterTest, AbleToCompressAndCheckGZipHeader) { + char input[10] = {0}; + compressWriter.write(input, sizeof(input)); + compressWriter.close(); + + const char *header = writer.getRawData(); + ASSERT_TRUE(header[0] == char(0x1f)); + ASSERT_TRUE(header[1] == char(0x8b)); +} + +TEST_F(CompressWriterTest, CloseMultipleTimes) { + char input[10] = {0}; + compressWriter.write(input, sizeof(input)); + + compressWriter.close(); + compressWriter.close(); + compressWriter.close(); + compressWriter.close(); + + this->simpleUncompress(writer.getRawData(), writer.getDataSize()); + EXPECT_STREQ(input, (const char *)this->out); +} + +TEST_F(CompressWriterTest, AbleToCompressOneSmallString) { + const char input[] = "The quick brown fox jumps over the lazy dog"; + + compressWriter.write(input, sizeof(input)); + compressWriter.close(); + + this->simpleUncompress(writer.getRawData(), writer.getDataSize()); + EXPECT_STREQ(input, (const char *)this->out); +} + +TEST_F(CompressWriterTest, AbleToWriteServeralTimesBeforeClose) { + unsigned int i, times = 100; + + const char input[] = "The quick brown fox jumps over the lazy dog"; + + for (i = 0; i < times; i++) { + compressWriter.write(input, sizeof(input)); + } + + compressWriter.close(); + + this->simpleUncompress(writer.getRawData(), writer.getDataSize()); + + for (i = 0; i < times; i++) { + ASSERT_TRUE(memcmp(input, (const char *)this->out + i * sizeof(input), sizeof(input)) == 0); + } +} + +TEST_F(CompressWriterTest, AbleToWriteLargerThanCompressChunkSize) { + char *input = new char[S3_ZIP_COMPRESS_CHUNKSIZE + 1]; + + EXPECT_THROW(compressWriter.write(input, S3_ZIP_COMPRESS_CHUNKSIZE + 1), std::runtime_error); +} \ No newline at end of file diff --git a/gpAux/extensions/gps3ext/test/decompress_reader_test.cpp b/gpAux/extensions/gps3ext/test/decompress_reader_test.cpp index 84874ea52e..6dbf553502 100644 --- a/gpAux/extensions/gps3ext/test/decompress_reader_test.cpp +++ b/gpAux/extensions/gps3ext/test/decompress_reader_test.cpp @@ -2,6 +2,7 @@ #include "decompress_reader.cpp" #include "gtest/gtest.h" +#include "s3macros.h" using std::vector; @@ -59,6 +60,9 @@ class DecompressReaderTest : public testing::Test { protected: // Remember that SetUp() is run immediately before a test starts. virtual void SetUp() { + // reset to default, because some tests will modify it + S3_ZIP_DECOMPRESS_CHUNKSIZE = S3_ZIP_DEFAULT_CHUNKSIZE; + // need to setup upstreamReader before open. this->bufReader.setChunkSize(1024 * 1024 * 64); decompressReader.setReader(&bufReader); @@ -68,6 +72,9 @@ class DecompressReaderTest : public testing::Test { // TearDown() is invoked immediately after a test finishes. virtual void TearDown() { decompressReader.close(); + + // reset to default, because some tests will modify it + S3_ZIP_DECOMPRESS_CHUNKSIZE = S3_ZIP_DEFAULT_CHUNKSIZE; } void setBufReaderByRawData(const void *input, int len) { @@ -149,10 +156,10 @@ TEST_F(DecompressReaderTest, AbleToDecompressWithSmallReadBuffer) { // output buffer is smaller than chunk size (16 bytes). // resize to 32 for 'in' and 'out' buffer - S3_ZIP_CHUNKSIZE = 32; - decompressReader.resizeDecompressReaderBuffer(S3_ZIP_CHUNKSIZE); + S3_ZIP_DECOMPRESS_CHUNKSIZE = 32; + decompressReader.resizeDecompressReaderBuffer(S3_ZIP_DECOMPRESS_CHUNKSIZE); - char hello[S3_ZIP_CHUNKSIZE + 2]; + char hello[S3_ZIP_DECOMPRESS_CHUNKSIZE + 2]; memset((void *)hello, 'A', sizeof(hello)); hello[sizeof(hello) - 1] = '\0'; @@ -174,8 +181,8 @@ TEST_F(DecompressReaderTest, AbleToDecompressWithSmallInternalReaderBuffer) { // output buffer is smaller than chunk size (9 bytes). // resize to 32 for 'in' and 'out' buffer - S3_ZIP_CHUNKSIZE = 10; - decompressReader.resizeDecompressReaderBuffer(S3_ZIP_CHUNKSIZE); + S3_ZIP_DECOMPRESS_CHUNKSIZE = 10; + decompressReader.resizeDecompressReaderBuffer(S3_ZIP_DECOMPRESS_CHUNKSIZE); char hello[34]; // compress 34 'A' will produce 12 compressed bytes. memset((void *)hello, 'A', sizeof(hello)); @@ -192,8 +199,8 @@ TEST_F(DecompressReaderTest, AbleToDecompressWithSmallInternalReaderBuffer) { } TEST_F(DecompressReaderTest, ReadFromOffsetForEachCall) { - S3_ZIP_CHUNKSIZE = 128; - decompressReader.resizeDecompressReaderBuffer(S3_ZIP_CHUNKSIZE); + S3_ZIP_DECOMPRESS_CHUNKSIZE = 128; + decompressReader.resizeDecompressReaderBuffer(S3_ZIP_DECOMPRESS_CHUNKSIZE); // Bigger chunk size, smaller read buffer from caller. Need read from offset for each call. char hello[] = "abcdefghigklmnopqrstuvwxyz"; @@ -218,10 +225,10 @@ TEST_F(DecompressReaderTest, AbleToDecompressWithAlignedLargeReadBuffer) { // resize to 8 for 'in' and 'out' buffer - S3_ZIP_CHUNKSIZE = 8; - decompressReader.resizeDecompressReaderBuffer(S3_ZIP_CHUNKSIZE); + S3_ZIP_DECOMPRESS_CHUNKSIZE = 8; + decompressReader.resizeDecompressReaderBuffer(S3_ZIP_DECOMPRESS_CHUNKSIZE); - char hello[S3_ZIP_CHUNKSIZE * 2 + 2]; + char hello[S3_ZIP_DECOMPRESS_CHUNKSIZE * 2 + 2]; memset((void *)hello, 'A', sizeof(hello)); hello[sizeof(hello) - 1] = '\0'; @@ -239,16 +246,16 @@ TEST_F(DecompressReaderTest, AbleToDecompressWithUnalignedLargeReadBuffer) { // Test case for: optimal buffer size fill after decompression // We need to make sure that we are filling the decompression // buffer fully before asking for a new chunck from the read buffer - S3_ZIP_CHUNKSIZE = 8; - decompressReader.resizeDecompressReaderBuffer(S3_ZIP_CHUNKSIZE); + S3_ZIP_DECOMPRESS_CHUNKSIZE = 8; + decompressReader.resizeDecompressReaderBuffer(S3_ZIP_DECOMPRESS_CHUNKSIZE); - char hello[S3_ZIP_CHUNKSIZE * 6 + 2]; + char hello[S3_ZIP_DECOMPRESS_CHUNKSIZE * 6 + 2]; memset((void *)hello, 'A', sizeof(hello)); hello[sizeof(hello) - 1] = '\0'; setBufReaderByRawData(hello, sizeof(hello)); - char outputBuffer[S3_ZIP_CHUNKSIZE * 6 + 4]; + char outputBuffer[S3_ZIP_DECOMPRESS_CHUNKSIZE * 6 + 4]; uint32_t expectedLen[] = {8, 8, 8, 8, 8, 8, 2, 0}; for (uint32_t i = 0; i < sizeof(expectedLen) / sizeof(uint32_t); i++) { @@ -257,8 +264,8 @@ TEST_F(DecompressReaderTest, AbleToDecompressWithUnalignedLargeReadBuffer) { } TEST_F(DecompressReaderTest, AbleToDecompressWithLargeReadBufferWithDecompressableString) { - S3_ZIP_CHUNKSIZE = 8; - decompressReader.resizeDecompressReaderBuffer(S3_ZIP_CHUNKSIZE); + S3_ZIP_DECOMPRESS_CHUNKSIZE = 8; + decompressReader.resizeDecompressReaderBuffer(S3_ZIP_DECOMPRESS_CHUNKSIZE); // Smaller chunk size, bigger read buffer from caller. Need composite multiple chunks. char hello[] = "abcdefghigklmnopqrstuvwxyz"; // 26+1 bytes @@ -278,8 +285,8 @@ TEST_F(DecompressReaderTest, AbleToDecompressWithLargeReadBufferWithDecompressab } TEST_F(DecompressReaderTest, AbleToDecompressWithSmartLargeReadBufferWithDecompressableString) { - S3_ZIP_CHUNKSIZE = 7; - decompressReader.resizeDecompressReaderBuffer(S3_ZIP_CHUNKSIZE); + S3_ZIP_DECOMPRESS_CHUNKSIZE = 7; + decompressReader.resizeDecompressReaderBuffer(S3_ZIP_DECOMPRESS_CHUNKSIZE); // Smaller chunk size, bigger read buffer from caller. Need composite multiple chunks. char hello[] = "abcdefghigklmnopqrstuvwxyz"; // 26+1 bytes @@ -300,8 +307,8 @@ TEST_F(DecompressReaderTest, AbleToDecompressWithSmartLargeReadBufferWithDecompr } TEST_F(DecompressReaderTest, AbleToDecompressWithIncorrectEncodedStream) { - S3_ZIP_CHUNKSIZE = 128; - decompressReader.resizeDecompressReaderBuffer(S3_ZIP_CHUNKSIZE); + S3_ZIP_DECOMPRESS_CHUNKSIZE = 128; + decompressReader.resizeDecompressReaderBuffer(S3_ZIP_DECOMPRESS_CHUNKSIZE); // set an incorrect encoding stream to Mock directly. // it will produce 'Z_DATA_ERROR' when decompressing diff --git a/gpAux/extensions/gps3ext/test/s3common_writer_test.cpp b/gpAux/extensions/gps3ext/test/s3common_writer_test.cpp new file mode 100644 index 0000000000..97ac1f146e --- /dev/null +++ b/gpAux/extensions/gps3ext/test/s3common_writer_test.cpp @@ -0,0 +1,237 @@ +#include "s3common_writer.cpp" +#include "gtest/gtest.h" +#include "mock_classes.h" +#include "s3macros.h" + +#include +#include +using std::vector; + +using ::testing::AtLeast; +using ::testing::Return; +using ::testing::Invoke; +using ::testing::Throw; +using ::testing::_; + +class MockS3InterfaceForCompressionWrite : public MockS3Interface { + public: + MockS3InterfaceForCompressionWrite(){}; + + const uint8_t* getRawData() const { + return this->data.data(); + } + + size_t getDataSize() const { + return this->data.size(); + } + + size_t getPartNumber() const { + return this->dataMap.size(); + } + + bool mockCheckKeyExistence(const string& keyUrl, const string& region, + const S3Credential& cred) { + return false; + } + + string mockGetUploadId(const string& keyUrl, const string& region, const S3Credential& cred) { + return this->uploadID; + } + + string mockUploadPartOfData(vector& data, const string& keyUrl, const string& region, + const S3Credential& cred, uint64_t partNumber, + const string& uploadId) { + this->dataMap[partNumber] = data; + return this->uploadID; + } + + bool mockCompleteMultiPart(const string& keyUrl, const string& region, const S3Credential& cred, + const string& uploadId, const vector& etagArray) { + map>::iterator it; + for (it = this->dataMap.begin(); it != this->dataMap.end(); it++) { + this->data.insert(this->data.end(), it->second.begin(), it->second.end()); + } + return true; + } + + private: + vector data; + map> dataMap; + const string uploadID = "I_am_an_uploadID"; +}; + +class S3CommonWriteTest : public ::testing::Test, public S3CommonWriter { + protected: + // Remember that SetUp() is run immediately before a test starts. + virtual void SetUp() { + this->setS3service(&mockS3Interface); + + this->out = new Byte[S3_ZIP_DECOMPRESS_CHUNKSIZE]; + } + + // TearDown() is invoked immediately after a test finishes. + virtual void TearDown() { + this->close(); + + delete this->out; + } + + void simpleUncompress(const char* input, uint64_t len) { + z_stream zstream; + + // allocate inflate state for zlib + zstream.zalloc = Z_NULL; + zstream.zfree = Z_NULL; + zstream.opaque = Z_NULL; + + int ret = inflateInit2(&zstream, S3_INFLATE_WINDOWSBITS); + CHECK_OR_DIE_MSG(ret == Z_OK, "%s", "failed to initialize zlib library"); + + zstream.avail_in = len; + zstream.next_in = (Byte*)input; + zstream.next_out = this->out; + zstream.avail_out = S3_ZIP_DECOMPRESS_CHUNKSIZE; + + ret = inflate(&zstream, Z_FULL_FLUSH); + + if (ret != Z_STREAM_END) { + S3DEBUG("Failed to uncompress sample data"); + } + + inflateEnd(&zstream); + } + + MockS3InterfaceForCompressionWrite mockS3Interface; + Byte* out; +}; + +TEST_F(S3CommonWriteTest, UsingGZip) { + s3ext_autocompress = true; + + EXPECT_CALL(mockS3Interface, getUploadId(_, _, _)) + .WillOnce(Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockGetUploadId)); + EXPECT_CALL(mockS3Interface, uploadPartOfData(_, _, _, _, _, _)) + .WillOnce( + Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockUploadPartOfData)); + EXPECT_CALL(mockS3Interface, completeMultiPart(_, _, _, _, _)) + .WillOnce( + Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockCompleteMultiPart)); + + WriterParams params; + params.setNumOfChunks(1); + params.setChunkSize(S3_ZIP_COMPRESS_CHUNKSIZE + 1); + + this->open(params); + + ASSERT_EQ(this->upstreamWriter, &this->compressWriter); + ASSERT_TRUE(NULL != dynamic_cast(this->upstreamWriter)); +} + +TEST_F(S3CommonWriteTest, UsingPlain) { + s3ext_autocompress = false; + + EXPECT_CALL(mockS3Interface, getUploadId(_, _, _)) + .WillOnce(Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockGetUploadId)); + + WriterParams params; + params.setNumOfChunks(1); + params.setChunkSize(S3_ZIP_COMPRESS_CHUNKSIZE + 1); + + this->open(params); + + ASSERT_EQ(this->upstreamWriter, &this->keyWriter); + ASSERT_TRUE(NULL != dynamic_cast(this->upstreamWriter)); +} + +TEST_F(S3CommonWriteTest, WritePlainData) { + s3ext_autocompress = false; + + // EXPECT_CALL(mockS3Interface, checkKeyExistence(_, _, _)) + // .WillOnce( + // Invoke(&mockS3Interface, + // &MockS3InterfaceForCompressionWrite::mockCheckKeyExistence)); + EXPECT_CALL(mockS3Interface, getUploadId(_, _, _)) + .WillOnce(Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockGetUploadId)); + EXPECT_CALL(mockS3Interface, uploadPartOfData(_, _, _, _, _, _)) + .WillOnce( + Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockUploadPartOfData)); + EXPECT_CALL(mockS3Interface, completeMultiPart(_, _, _, _, _)) + .WillOnce( + Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockCompleteMultiPart)); + + WriterParams params; + params.setNumOfChunks(1); + params.setChunkSize(S3_ZIP_COMPRESS_CHUNKSIZE + 1); + + this->open(params); + + // 44 bytes + const char input[] = "The quick brown fox jumps over the lazy dog"; + this->write(input, sizeof(input)); + this->close(); + + EXPECT_STREQ(input, (const char*)this->mockS3Interface.getRawData()); +} + +TEST_F(S3CommonWriteTest, WriteGZipData) { + s3ext_autocompress = true; + + EXPECT_CALL(mockS3Interface, getUploadId(_, _, _)) + .WillOnce(Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockGetUploadId)); + EXPECT_CALL(mockS3Interface, uploadPartOfData(_, _, _, _, _, _)) + .WillOnce( + Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockUploadPartOfData)); + EXPECT_CALL(mockS3Interface, completeMultiPart(_, _, _, _, _)) + .WillOnce( + Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockCompleteMultiPart)); + + WriterParams params; + params.setNumOfChunks(1); + params.setChunkSize(S3_ZIP_COMPRESS_CHUNKSIZE + 1); + + this->open(params); + + // 44 bytes + const char input[] = "The quick brown fox jumps over the lazy dog"; + this->write(input, sizeof(input)); + this->close(); + + this->simpleUncompress((const char*)this->mockS3Interface.getRawData(), + this->mockS3Interface.getDataSize()); + EXPECT_STREQ(input, (const char*)this->out); +} + +TEST_F(S3CommonWriteTest, WriteGZipDataMultipleTimes) { + s3ext_autocompress = true; + + EXPECT_CALL(mockS3Interface, getUploadId(_, _, _)) + .WillRepeatedly( + Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockGetUploadId)); + EXPECT_CALL(mockS3Interface, uploadPartOfData(_, _, _, _, _, _)) + .WillRepeatedly( + Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockUploadPartOfData)); + EXPECT_CALL(mockS3Interface, completeMultiPart(_, _, _, _, _)) + .WillRepeatedly( + Invoke(&mockS3Interface, &MockS3InterfaceForCompressionWrite::mockCompleteMultiPart)); + + WriterParams params; + params.setNumOfChunks(1); + params.setChunkSize(S3_ZIP_COMPRESS_CHUNKSIZE + 1); + + this->open(params); + + // 44 bytes * 10 + const char input[] = "The quick brown fox jumps over the lazy dog"; + int times = 10; + for (int i = 0; i < times; i++) { + this->write(input, sizeof(input)); + } + this->close(); + + this->simpleUncompress((const char*)this->mockS3Interface.getRawData(), + this->mockS3Interface.getDataSize()); + + for (int i = 0; i < times; i++) { + ASSERT_TRUE(memcmp(input, (const char*)this->out + i * sizeof(input), sizeof(input)) == 0); + } +} \ No newline at end of file -- GitLab