diff --git a/gpAux/extensions/gps3ext/Makefile b/gpAux/extensions/gps3ext/Makefile index 5a06c8587040a80def2cf379b94e52835d600772..beb336748755777878da532d849d8a52127d6367 100644 --- a/gpAux/extensions/gps3ext/Makefile +++ b/gpAux/extensions/gps3ext/Makefile @@ -3,7 +3,7 @@ DEBUG_S3 = n DEBUG_S3_CURL = n # Flags -SHLIB_LINK = -lstdc++ -lxml2 -lpthread -lcrypto -lcurl +SHLIB_LINK = -lstdc++ -lxml2 -lpthread -lcrypto -lcurl -lz PG_CPPFLAGS = -O2 -g -std=c++98 -fPIC -I$(libpq_srcdir) -Isrc -Ilib -I/usr/include/libxml2 -I$(libpq_srcdir)/postgresql/server/utils ifeq ($(DEBUG_S3),y) diff --git a/gpAux/extensions/gps3ext/Makefile.test b/gpAux/extensions/gps3ext/Makefile.test index 2fe02a426c1760d651b2e49aa718b423c4490ddb..4a4745a486c19be3000502d4fdc1d141206c6656 100644 --- a/gpAux/extensions/gps3ext/Makefile.test +++ b/gpAux/extensions/gps3ext/Makefile.test @@ -3,12 +3,12 @@ DEBUG_S3 = y DEBUG_S3_CURL = n AWS_TEST = y FAKE_TEST = y -BIG_FILE_TEST = y +BIG_FILE_TEST = n # Flags CPP = g++ INCLUDES = -Isrc -Ilib -I/usr/include/libxml2 -LDFLAGS = -lpthread -lcrypto -lcurl -lxml2 -lgcov +LDFLAGS = -lpthread -lcrypto -lcurl -lxml2 -lgcov -lz CFLAGS = -O2 -g3 -std=c++98 -fPIC -fprofile-arcs -ftest-coverage ifeq ($(DEBUG_S3),y) diff --git a/gpAux/extensions/gps3ext/src/s3downloader.cpp b/gpAux/extensions/gps3ext/src/s3downloader.cpp index 0b71c0a4cc3a81761583d2c347b55238943506cf..2bb7c22fcb0f337c68e0d790b01cba1c3cbcc19b 100644 --- a/gpAux/extensions/gps3ext/src/s3downloader.cpp +++ b/gpAux/extensions/gps3ext/src/s3downloader.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include "gps3ext.h" #include "s3downloader.h" @@ -84,14 +85,24 @@ bool BlockingBuffer::Init() { // ret < len means EMPTY uint64_t BlockingBuffer::Read(char *buf, uint64_t len) { + // QueryCancelPending stops s3_import(), this check is not needed if + // s3_import() every time calls BlockingBuffer->Read() only once, + // otherwise(as we do in Downloader->get() for decompression feature), + // first call sets buffer to STATUS_EMPTY, second call hangs. + if (QueryCancelPending) { + S3INFO("Buffer reading is interrupted by GPDB"); + return 0; + } + // assert buf not null // assert len > 0, len < this->bufcap pthread_mutex_lock(&this->stat_mutex); while (this->status == BlockingBuffer::STATUS_EMPTY) { pthread_cond_wait(&this->stat_cond, &this->stat_mutex); } + uint64_t left_data_length = this->realsize - this->readpos; - int length_to_read = std::min(len, left_data_length); + uint64_t length_to_read = std::min(len, left_data_length); memcpy(buf, this->bufferdata + this->readpos, length_to_read); if (left_data_length >= len) { @@ -212,7 +223,13 @@ void *DownloadThreadfunc(void *data) { } Downloader::Downloader(uint8_t part_num) - : num(part_num), o(NULL), chunkcount(0), readlen(0) { + : num(part_num), + o(NULL), + chunkcount(0), + readlen(0), + magic_bytes_num(0), + compression(S3_ZIP_NONE), + z_info(NULL) { this->threads = (pthread_t *)malloc(num * sizeof(pthread_t)); if (this->threads) memset((void *)this->threads, 0, num * sizeof(pthread_t)); @@ -250,23 +267,89 @@ bool Downloader::init(string url, string region, uint64_t size, pthread_create(&this->threads[i], NULL, DownloadThreadfunc, this->buffers[i]); } + readlen = 0; chunkcount = 0; + memset(this->magic_bytes, 0, sizeof(this->magic_bytes)); + + return true; +} + +bool Downloader::set_compression() { + if ((this->magic_bytes[0] == 0x1f) && (this->magic_bytes[1] == 0x8b)) { + this->compression = S3_ZIP_GZIP; + + this->z_info = new zstream_info(); + if (!this->z_info) { + S3ERROR("Failed to allocate memory"); + return false; + } + + this->z_info->inited = false; + this->z_info->in = NULL; + this->z_info->out = NULL; + this->z_info->done_out = 0; + this->z_info->have_out = 0; + } else { + this->compression = S3_ZIP_NONE; + } + return true; } bool Downloader::get(char *data, uint64_t &len) { + if (this->magic_bytes_num == 0) { + // get first 4(at least 2) bytes to check if this file is compressed + BlockingBuffer *buf = buffers[this->chunkcount % this->num]; + + if ((this->magic_bytes_num = buf->Read( + (char *)this->magic_bytes, sizeof(this->magic_bytes))) > 1) { + if (!this->set_compression()) { + return false; + } + } + } + + switch (this->compression) { + case S3_ZIP_GZIP: + return this->zstream_get(data, len); + break; + default: + return this->plain_get(data, len); + } +} + +bool Downloader::plain_get(char *data, uint64_t &len) { uint64_t filelen = this->o->Size(); + uint64_t tmplen = 0; RETRY: + // confirm there is no more available data, done with this file if (this->readlen == filelen) { len = 0; return true; } BlockingBuffer *buf = buffers[this->chunkcount % this->num]; - uint64_t tmplen = buf->Read(data, len); + + // get data from this->magic_bytes, or buf->Read(), or both + if (this->readlen < this->magic_bytes_num) { + if ((this->readlen + len) <= this->magic_bytes_num) { + memcpy(data, this->magic_bytes + this->readlen, len); + tmplen = len; + } else { + memcpy(data, this->magic_bytes + this->readlen, + this->magic_bytes_num - this->readlen); + tmplen = this->magic_bytes_num - this->readlen + + buf->Read(data + this->magic_bytes_num - this->readlen, + this->readlen + len - this->magic_bytes_num); + } + } else { + tmplen = buf->Read(data, len); + } + this->readlen += tmplen; + if (tmplen < len) { this->chunkcount++; if (buf->Error()) { @@ -286,21 +369,166 @@ RETRY: return true; } +bool Downloader::zstream_get(char *data, uint64_t &len) { + uint64_t filelen = this->o->Size(); + +// S3_ZIP_CHUNKSIZE is simply the buffer size for feeding data to and +// pulling data from the zlib routines. 256K is recommended by zlib. +#define S3_ZIP_CHUNKSIZE 256 * 1024 + uint32_t left_out = 0; + zstream_info *zinfo = this->z_info; + z_stream *strm = &zinfo->zstream; + +RETRY: + // fail-safe, incase(very unlikely) there is a infinite-loop bug. For + // instance, S3 returns wrong file size which is larger than actual the + // number. Never happened, but better be careful. + if (this->chunkcount > (this->o->Size() / this->o->Chunksize() + 2)) { + if (zinfo->inited) { + inflateEnd(strm); + } + len = 0; + return false; + } + + // no more available data to read, decompress or copy, done with this file + if (this->readlen == filelen && !(zinfo->have_out - zinfo->done_out) && + !strm->avail_in) { + if (zinfo->inited) { + inflateEnd(strm); + } + len = 0; + return true; + } + + BlockingBuffer *buf = buffers[this->chunkcount % this->num]; + + // strm is the structure used by zlib to decompress stream + if (!zinfo->inited) { + strm->zalloc = Z_NULL; + strm->zfree = Z_NULL; + strm->opaque = Z_NULL; + strm->avail_in = 0; + strm->next_in = Z_NULL; + + zinfo->in = (unsigned char *)malloc(S3_ZIP_CHUNKSIZE); + zinfo->out = (unsigned char *)malloc(S3_ZIP_CHUNKSIZE); + if (!zinfo->in || !zinfo->out) { + S3ERROR("Failed to allocate memory"); + return false; + } + // 47 is the number of windows bits, to make sure zlib could recognize + // and decode gzip stream + if (inflateInit2(strm, 47) != Z_OK) { + S3ERROR("Failed to init gzip function"); + return false; + } + + zinfo->inited = true; + } + + do { + // copy decompressed data + left_out = zinfo->have_out - zinfo->done_out; + if (left_out > len) { + memcpy(data, zinfo->out + zinfo->done_out, len); + zinfo->done_out += len; + break; + } else if (left_out) { + memcpy(data, zinfo->out + zinfo->done_out, left_out); + zinfo->done_out = 0; + zinfo->have_out = 0; + len = left_out; + break; + } + + // get another decompressed chunk + if (this->readlen && (strm->avail_in != 0)) { + strm->avail_out = S3_ZIP_CHUNKSIZE; + strm->next_out = zinfo->out; + + switch (inflate(strm, Z_NO_FLUSH)) { + case Z_STREAM_ERROR: + case Z_NEED_DICT: + case Z_DATA_ERROR: + case Z_MEM_ERROR: + S3ERROR("Failed to decompress data"); + inflateEnd(strm); + return false; + } + + zinfo->have_out = S3_ZIP_CHUNKSIZE - strm->avail_out; + } + + // get another compressed chunk + // from magic_bytes, or buf->Read(), or both + if (!zinfo->have_out) { + if (this->readlen < this->magic_bytes_num) { + memcpy(zinfo->in, this->magic_bytes + this->readlen, + this->magic_bytes_num - this->readlen); + strm->avail_in = + this->magic_bytes_num - this->readlen + + buf->Read((char *)zinfo->in + this->magic_bytes_num - + this->readlen, + S3_ZIP_CHUNKSIZE - this->magic_bytes_num + + this->readlen); + } else { + strm->avail_in = buf->Read((char *)zinfo->in, S3_ZIP_CHUNKSIZE); + } + + if (buf->Error()) { + S3ERROR("Error occurs while downloading, skip"); + inflateEnd(strm); + return false; + } + strm->next_in = zinfo->in; + + // readlen is the read size of orig file, not the decompressed + this->readlen += strm->avail_in; + + // done with *reading* this compressed file, still need to confirm + // it's all decompressed and transferred/get() + if (strm->avail_in == 0) { + this->chunkcount++; + goto RETRY; + } + } + } while (1); + + return true; +} + void Downloader::destroy() { for (int i = 0; i < this->num; i++) { if (this->threads && this->threads[i]) pthread_cancel(this->threads[i]); } + for (int i = 0; i < this->num; i++) { if (this->threads && this->threads[i]) pthread_join(this->threads[i], NULL); if (this->buffers && this->buffers[i]) delete this->buffers[i]; } + if (this->o) delete this->o; } Downloader::~Downloader() { if (this->threads) free(this->threads); if (this->buffers) free(this->buffers); + + if (this->z_info) { + if (this->z_info->in) { + free(this->z_info->in); + this->z_info->in = NULL; + } + if (this->z_info->out) { + free(this->z_info->out); + this->z_info->out = NULL; + } + + delete this->z_info; + this->z_info = NULL; + } } // return the number of items @@ -622,7 +850,7 @@ static bool extractContent(ListBucketResult *result, xmlNode *root_element, S3ERROR("Faild to create item for %s", key); } } else { - S3INFO("Size of %s is " PRIu64 ", skip it", key, size); + S3INFO("Size of \"%s\" is %" PRIu64 ", skip it", key, size); } } diff --git a/gpAux/extensions/gps3ext/src/s3downloader.h b/gpAux/extensions/gps3ext/src/s3downloader.h index 328dd7a8f28420ee572de50ee3c0cb8eceff2bd6..258de635aa1032c160607a071c664285442585b6 100644 --- a/gpAux/extensions/gps3ext/src/s3downloader.h +++ b/gpAux/extensions/gps3ext/src/s3downloader.h @@ -17,6 +17,7 @@ #include #include +#include #include "s3common.h" @@ -28,6 +29,11 @@ struct Range { uint64_t len; }; +typedef enum compression_type { + S3_ZIP_NONE, + S3_ZIP_GZIP, +} compression_type_t; + class OffsetMgr { public: OffsetMgr(uint64_t maxsize, uint64_t chunksize); @@ -79,15 +85,24 @@ class BlockingBuffer { Range nextpos; }; -struct Downloader { +struct zstream_info { + z_stream zstream; + bool inited; + unsigned char* in; + unsigned char* out; + uint64_t have_out; + uint64_t done_out; +}; + +class Downloader { + public: Downloader(uint8_t part_num); ~Downloader(); bool init(string url, string region, uint64_t size, uint64_t chunksize, S3Credential* pcred); bool get(char* buf, uint64_t& len); void destroy(); - // reset - // init(url) + private: const uint8_t num; pthread_t* threads; @@ -95,6 +110,16 @@ struct Downloader { OffsetMgr* o; uint64_t chunkcount; uint64_t readlen; + + unsigned char magic_bytes[4]; + uint8_t magic_bytes_num; + compression_type_t compression; + bool set_compression(); + + bool plain_get(char* buf, uint64_t& len); + + struct zstream_info* z_info; + bool zstream_get(char* buf, uint64_t& len); }; struct Bufinfo { diff --git a/gpAux/extensions/gps3ext/src/s3downloader_test.cpp b/gpAux/extensions/gps3ext/src/s3downloader_test.cpp index b071191bddf6c93deebaefcf8f232e48ea881ee6..d5061f0572a8e32c6143e91de2b20a0b3f4b2b3c 100644 --- a/gpAux/extensions/gps3ext/src/s3downloader_test.cpp +++ b/gpAux/extensions/gps3ext/src/s3downloader_test.cpp @@ -242,14 +242,12 @@ void S3DwonloadTest(const char *url, const char *region, uint64_t file_size, } TEST(S3Downloader, simple) { - printf("Try downloading data0014\n"); S3DwonloadTest( "http://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/small17/" "data0014", "us-west-2", 4420346, "68c4a63b721e7af0ae945ce109ca87ad", 4, 1024 * 1024, 65536); - printf("Try downloading data0016\n"); S3DwonloadTest( "http://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/small17/" "data0016", @@ -258,27 +256,29 @@ TEST(S3Downloader, simple) { } TEST(S3Downloader, httpssimple) { - printf("Try downloading data0014\n"); S3DwonloadTest( - "http://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/small17/" + "https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/small17/" "data0014", "us-west-2", 4420346, "68c4a63b721e7af0ae945ce109ca87ad", 4, 1024 * 1024, 65536); - printf("Try downloading data0016\n"); S3DwonloadTest( - "http://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/small17/" + "https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/small17/" "data0016", "us-west-2", 2536018, "0fd502a303eb8f138f5916ec357721b1", 4, 1024 * 1024, 65536); } +TEST(S3Downloader, gzipped) { + S3DwonloadTest( + "https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/gzipped/" + "data0001.gz", + "us-west-2", 328950, "b958fb80b98605a6095e6ebc4b9b4786", 3, 1024 * 1024, + 65536); +} + #ifdef BIG_FILE_TEST TEST(S3Downloader, bigfile) { - // InitLog(); - // s3ext_loglevel = EXT_DEBUG; - // s3ext_logtype = STDERR_LOG; - printf("Try downloading big file 1\n"); S3DwonloadTest( "http://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/hugefile/" "airlinedata1.csv", diff --git a/gpAux/extensions/gps3ext/src/s3wrapper_test.cpp b/gpAux/extensions/gps3ext/src/s3wrapper_test.cpp index d450d7fffd11e162e0ceb783c282394b8c2d8747..8cbd0f1560283d255a43c31506cad1285ad40f62 100644 --- a/gpAux/extensions/gps3ext/src/s3wrapper_test.cpp +++ b/gpAux/extensions/gps3ext/src/s3wrapper_test.cpp @@ -224,6 +224,19 @@ TEST(ExtWrapper, normal_2segs) { 64 * 1024, "db05de0ec7e0808268e2363d3572dc7f", 1, 2, 64 * 1024 * 1024); } +TEST(ExtWrapper, gzipped) { + ExtWrapperTest( + "https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/gzipped/", + 64 * 1024, "7b2260e9a3a3f26e84aa28dc2124f68f", 0, 1, 123 * 1024); +} + +TEST(ExtWrapper, gzipped_normal1) { + ExtWrapperTest( + "https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/" + "gzipped_normal1/", + 64 * 1024, "eacb7b210d3f7703ee06d16f520b103e", 0, 1, 64 * 1024 * 1024); +} + #ifdef BIG_FILE_TEST TEST(ExtWrapper, normal_3segs) { ExtWrapperTest( @@ -277,6 +290,13 @@ TEST(ExtWrapper, normal2_3segs) { "https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/normal/", 64 * 1024, "00675684b6d6697571f22baaf407c6df", 2, 3, 64 * 1024 * 1024); } + +TEST(ExtWrapper, gzipped_normal2) { + ExtWrapperTest( + "https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/" + "gzipped_normal2/", + 64 * 1024, "a930794bc885bccf6eed45bd40367a7d", 0, 1, 64 * 1024 * 1024); +} #endif #endif // AWS_TEST diff --git a/gpAux/extensions/gps3ext/test/regress_1.08.sql b/gpAux/extensions/gps3ext/test/regress_1.08.sql index 5030d3065ac7a68a9df2f1987e084c38fa465e06..ceb81c899630fe2738041d89f368e8b4f8bdc646 100644 --- a/gpAux/extensions/gps3ext/test/regress_1.08.sql +++ b/gpAux/extensions/gps3ext/test/regress_1.08.sql @@ -15,7 +15,6 @@ CREATE PROTOCOL s3 ( select * from pg_extprotocol; drop external table s3example; --- create READABLE external table s3example (Year text, Month text, DayofMonth text, DayOfWeek text, DepTime text, CRSDepTime text, ArrTime text,CRSArrTime text, UniqueCarrier text, FlightNum text,TailNum text, ActualElapsedTime text, CRSElapsedTime text, AirTime text, ArrDelay text, DepDelay text, Origin text, Dest text, Distance text, TaxiIn text, TaxiOut text, Cancelled text, CancellationCode text, Diverted text, CarrierDelay text, WeatherDelay text, NASDelay text, SecurityDelay text, LateAircraftDelay text) location('s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/hugefile config=/home/gpadmin/s3.conf') format 'csv' LOG ERRORS SEGMENT REJECT LIMIT 100 PERCENT; create READABLE external table s3example (Year text, Month text, DayofMonth text, DayOfWeek text, DepTime text, CRSDepTime text, ArrTime text,CRSArrTime text, UniqueCarrier text, FlightNum text,TailNum text, ActualElapsedTime text, CRSElapsedTime text, AirTime text, ArrDelay text, DepDelay text, Origin text, Dest text, Distance text, TaxiIn text, TaxiOut text, Cancelled text, CancellationCode text, Diverted text, CarrierDelay text, WeatherDelay text, NASDelay text, SecurityDelay text, LateAircraftDelay text) location('s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/hugefile_correct_data/ config=/home/gpadmin/s3.conf') format 'csv' SEGMENT REJECT LIMIT 100 PERCENT; SELECT count(*) FROM s3example; diff --git a/gpAux/extensions/gps3ext/test/regress_1.11.sql b/gpAux/extensions/gps3ext/test/regress_1.11.sql new file mode 100644 index 0000000000000000000000000000000000000000..9fd4afcb551db4c6fb500aee5499ca688be4bb40 --- /dev/null +++ b/gpAux/extensions/gps3ext/test/regress_1.11.sql @@ -0,0 +1,35 @@ +-- ======== +-- PROTOCOL +-- ======== + +-- create the database functions +CREATE OR REPLACE FUNCTION read_from_s3() RETURNS integer AS + '$libdir/gps3ext.so', 's3_import' LANGUAGE C STABLE; + +-- declare the protocol name along with in/out funcs +CREATE PROTOCOL s3 ( + readfunc = read_from_s3 +); + +-- Check out the catalog table +select * from pg_extprotocol; + +drop external table s3example; +create READABLE external table s3example (date text, time text, open float, high float, + low float, volume int) location('s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/gzipped_normal1/ config=/home/gpadmin/s3.conf') FORMAT 'csv'; + +SELECT count(*) FROM s3example; +SELECT sum(open) FROM s3example; +SELECT avg(open) FROM s3example; + +drop external table s3example; +create READABLE external table s3example (Year text, Month text, DayofMonth text, DayOfWeek text, DepTime text, CRSDepTime text, ArrTime text,CRSArrTime text, UniqueCarrier text, FlightNum text,TailNum text, ActualElapsedTime text, CRSElapsedTime text, AirTime text, ArrDelay text, DepDelay text, Origin text, Dest text, Distance text, TaxiIn text, TaxiOut text, Cancelled text, CancellationCode text, Diverted text, CarrierDelay text, WeatherDelay text, NASDelay text, SecurityDelay text, LateAircraftDelay text) location('s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/gzipped_normal2/ config=/home/gpadmin/s3.conf') format 'csv' SEGMENT REJECT LIMIT 100 PERCENT; + +SELECT count(*) FROM s3example; + +-- ======= +-- CLEANUP +-- ======= +DROP EXTERNAL TABLE s3example; + +DROP PROTOCOL s3;