提交 afdc27f8 编写于 作者: A Adam Lee

s3ext: add gzip decompressing support

Now s3ext could recognize then decompress gzip encoded files
automatically, doesn't require any extra parameter, configuration or
extended filename.
上级 92e15420
......@@ -3,7 +3,7 @@ DEBUG_S3 = n
DEBUG_S3_CURL = n
# Flags
SHLIB_LINK = -lstdc++ -lxml2 -lpthread -lcrypto -lcurl
SHLIB_LINK = -lstdc++ -lxml2 -lpthread -lcrypto -lcurl -lz
PG_CPPFLAGS = -O2 -g -std=c++98 -fPIC -I$(libpq_srcdir) -Isrc -Ilib -I/usr/include/libxml2 -I$(libpq_srcdir)/postgresql/server/utils
ifeq ($(DEBUG_S3),y)
......
......@@ -3,12 +3,12 @@ DEBUG_S3 = y
DEBUG_S3_CURL = n
AWS_TEST = y
FAKE_TEST = y
BIG_FILE_TEST = y
BIG_FILE_TEST = n
# Flags
CPP = g++
INCLUDES = -Isrc -Ilib -I/usr/include/libxml2
LDFLAGS = -lpthread -lcrypto -lcurl -lxml2 -lgcov
LDFLAGS = -lpthread -lcrypto -lcurl -lxml2 -lgcov -lz
CFLAGS = -O2 -g3 -std=c++98 -fPIC -fprofile-arcs -ftest-coverage
ifeq ($(DEBUG_S3),y)
......
......@@ -9,6 +9,7 @@
#include <curl/curl.h>
#include <libxml/parser.h>
#include <libxml/tree.h>
#include <zlib.h>
#include "gps3ext.h"
#include "s3downloader.h"
......@@ -84,14 +85,24 @@ bool BlockingBuffer::Init() {
// ret < len means EMPTY
uint64_t BlockingBuffer::Read(char *buf, uint64_t len) {
// QueryCancelPending stops s3_import(), this check is not needed if
// s3_import() every time calls BlockingBuffer->Read() only once,
// otherwise(as we do in Downloader->get() for decompression feature),
// first call sets buffer to STATUS_EMPTY, second call hangs.
if (QueryCancelPending) {
S3INFO("Buffer reading is interrupted by GPDB");
return 0;
}
// assert buf not null
// assert len > 0, len < this->bufcap
pthread_mutex_lock(&this->stat_mutex);
while (this->status == BlockingBuffer::STATUS_EMPTY) {
pthread_cond_wait(&this->stat_cond, &this->stat_mutex);
}
uint64_t left_data_length = this->realsize - this->readpos;
int length_to_read = std::min(len, left_data_length);
uint64_t length_to_read = std::min(len, left_data_length);
memcpy(buf, this->bufferdata + this->readpos, length_to_read);
if (left_data_length >= len) {
......@@ -212,7 +223,13 @@ void *DownloadThreadfunc(void *data) {
}
Downloader::Downloader(uint8_t part_num)
: num(part_num), o(NULL), chunkcount(0), readlen(0) {
: num(part_num),
o(NULL),
chunkcount(0),
readlen(0),
magic_bytes_num(0),
compression(S3_ZIP_NONE),
z_info(NULL) {
this->threads = (pthread_t *)malloc(num * sizeof(pthread_t));
if (this->threads)
memset((void *)this->threads, 0, num * sizeof(pthread_t));
......@@ -250,23 +267,89 @@ bool Downloader::init(string url, string region, uint64_t size,
pthread_create(&this->threads[i], NULL, DownloadThreadfunc,
this->buffers[i]);
}
readlen = 0;
chunkcount = 0;
memset(this->magic_bytes, 0, sizeof(this->magic_bytes));
return true;
}
bool Downloader::set_compression() {
if ((this->magic_bytes[0] == 0x1f) && (this->magic_bytes[1] == 0x8b)) {
this->compression = S3_ZIP_GZIP;
this->z_info = new zstream_info();
if (!this->z_info) {
S3ERROR("Failed to allocate memory");
return false;
}
this->z_info->inited = false;
this->z_info->in = NULL;
this->z_info->out = NULL;
this->z_info->done_out = 0;
this->z_info->have_out = 0;
} else {
this->compression = S3_ZIP_NONE;
}
return true;
}
bool Downloader::get(char *data, uint64_t &len) {
if (this->magic_bytes_num == 0) {
// get first 4(at least 2) bytes to check if this file is compressed
BlockingBuffer *buf = buffers[this->chunkcount % this->num];
if ((this->magic_bytes_num = buf->Read(
(char *)this->magic_bytes, sizeof(this->magic_bytes))) > 1) {
if (!this->set_compression()) {
return false;
}
}
}
switch (this->compression) {
case S3_ZIP_GZIP:
return this->zstream_get(data, len);
break;
default:
return this->plain_get(data, len);
}
}
bool Downloader::plain_get(char *data, uint64_t &len) {
uint64_t filelen = this->o->Size();
uint64_t tmplen = 0;
RETRY:
// confirm there is no more available data, done with this file
if (this->readlen == filelen) {
len = 0;
return true;
}
BlockingBuffer *buf = buffers[this->chunkcount % this->num];
uint64_t tmplen = buf->Read(data, len);
// get data from this->magic_bytes, or buf->Read(), or both
if (this->readlen < this->magic_bytes_num) {
if ((this->readlen + len) <= this->magic_bytes_num) {
memcpy(data, this->magic_bytes + this->readlen, len);
tmplen = len;
} else {
memcpy(data, this->magic_bytes + this->readlen,
this->magic_bytes_num - this->readlen);
tmplen = this->magic_bytes_num - this->readlen +
buf->Read(data + this->magic_bytes_num - this->readlen,
this->readlen + len - this->magic_bytes_num);
}
} else {
tmplen = buf->Read(data, len);
}
this->readlen += tmplen;
if (tmplen < len) {
this->chunkcount++;
if (buf->Error()) {
......@@ -286,21 +369,166 @@ RETRY:
return true;
}
bool Downloader::zstream_get(char *data, uint64_t &len) {
uint64_t filelen = this->o->Size();
// S3_ZIP_CHUNKSIZE is simply the buffer size for feeding data to and
// pulling data from the zlib routines. 256K is recommended by zlib.
#define S3_ZIP_CHUNKSIZE 256 * 1024
uint32_t left_out = 0;
zstream_info *zinfo = this->z_info;
z_stream *strm = &zinfo->zstream;
RETRY:
// fail-safe, incase(very unlikely) there is a infinite-loop bug. For
// instance, S3 returns wrong file size which is larger than actual the
// number. Never happened, but better be careful.
if (this->chunkcount > (this->o->Size() / this->o->Chunksize() + 2)) {
if (zinfo->inited) {
inflateEnd(strm);
}
len = 0;
return false;
}
// no more available data to read, decompress or copy, done with this file
if (this->readlen == filelen && !(zinfo->have_out - zinfo->done_out) &&
!strm->avail_in) {
if (zinfo->inited) {
inflateEnd(strm);
}
len = 0;
return true;
}
BlockingBuffer *buf = buffers[this->chunkcount % this->num];
// strm is the structure used by zlib to decompress stream
if (!zinfo->inited) {
strm->zalloc = Z_NULL;
strm->zfree = Z_NULL;
strm->opaque = Z_NULL;
strm->avail_in = 0;
strm->next_in = Z_NULL;
zinfo->in = (unsigned char *)malloc(S3_ZIP_CHUNKSIZE);
zinfo->out = (unsigned char *)malloc(S3_ZIP_CHUNKSIZE);
if (!zinfo->in || !zinfo->out) {
S3ERROR("Failed to allocate memory");
return false;
}
// 47 is the number of windows bits, to make sure zlib could recognize
// and decode gzip stream
if (inflateInit2(strm, 47) != Z_OK) {
S3ERROR("Failed to init gzip function");
return false;
}
zinfo->inited = true;
}
do {
// copy decompressed data
left_out = zinfo->have_out - zinfo->done_out;
if (left_out > len) {
memcpy(data, zinfo->out + zinfo->done_out, len);
zinfo->done_out += len;
break;
} else if (left_out) {
memcpy(data, zinfo->out + zinfo->done_out, left_out);
zinfo->done_out = 0;
zinfo->have_out = 0;
len = left_out;
break;
}
// get another decompressed chunk
if (this->readlen && (strm->avail_in != 0)) {
strm->avail_out = S3_ZIP_CHUNKSIZE;
strm->next_out = zinfo->out;
switch (inflate(strm, Z_NO_FLUSH)) {
case Z_STREAM_ERROR:
case Z_NEED_DICT:
case Z_DATA_ERROR:
case Z_MEM_ERROR:
S3ERROR("Failed to decompress data");
inflateEnd(strm);
return false;
}
zinfo->have_out = S3_ZIP_CHUNKSIZE - strm->avail_out;
}
// get another compressed chunk
// from magic_bytes, or buf->Read(), or both
if (!zinfo->have_out) {
if (this->readlen < this->magic_bytes_num) {
memcpy(zinfo->in, this->magic_bytes + this->readlen,
this->magic_bytes_num - this->readlen);
strm->avail_in =
this->magic_bytes_num - this->readlen +
buf->Read((char *)zinfo->in + this->magic_bytes_num -
this->readlen,
S3_ZIP_CHUNKSIZE - this->magic_bytes_num +
this->readlen);
} else {
strm->avail_in = buf->Read((char *)zinfo->in, S3_ZIP_CHUNKSIZE);
}
if (buf->Error()) {
S3ERROR("Error occurs while downloading, skip");
inflateEnd(strm);
return false;
}
strm->next_in = zinfo->in;
// readlen is the read size of orig file, not the decompressed
this->readlen += strm->avail_in;
// done with *reading* this compressed file, still need to confirm
// it's all decompressed and transferred/get()
if (strm->avail_in == 0) {
this->chunkcount++;
goto RETRY;
}
}
} while (1);
return true;
}
void Downloader::destroy() {
for (int i = 0; i < this->num; i++) {
if (this->threads && this->threads[i]) pthread_cancel(this->threads[i]);
}
for (int i = 0; i < this->num; i++) {
if (this->threads && this->threads[i])
pthread_join(this->threads[i], NULL);
if (this->buffers && this->buffers[i]) delete this->buffers[i];
}
if (this->o) delete this->o;
}
Downloader::~Downloader() {
if (this->threads) free(this->threads);
if (this->buffers) free(this->buffers);
if (this->z_info) {
if (this->z_info->in) {
free(this->z_info->in);
this->z_info->in = NULL;
}
if (this->z_info->out) {
free(this->z_info->out);
this->z_info->out = NULL;
}
delete this->z_info;
this->z_info = NULL;
}
}
// return the number of items
......@@ -622,7 +850,7 @@ static bool extractContent(ListBucketResult *result, xmlNode *root_element,
S3ERROR("Faild to create item for %s", key);
}
} else {
S3INFO("Size of %s is " PRIu64 ", skip it", key, size);
S3INFO("Size of \"%s\" is %" PRIu64 ", skip it", key, size);
}
}
......
......@@ -17,6 +17,7 @@
#include <cstring>
#include <curl/curl.h>
#include <zlib.h>
#include "s3common.h"
......@@ -28,6 +29,11 @@ struct Range {
uint64_t len;
};
typedef enum compression_type {
S3_ZIP_NONE,
S3_ZIP_GZIP,
} compression_type_t;
class OffsetMgr {
public:
OffsetMgr(uint64_t maxsize, uint64_t chunksize);
......@@ -79,15 +85,24 @@ class BlockingBuffer {
Range nextpos;
};
struct Downloader {
struct zstream_info {
z_stream zstream;
bool inited;
unsigned char* in;
unsigned char* out;
uint64_t have_out;
uint64_t done_out;
};
class Downloader {
public:
Downloader(uint8_t part_num);
~Downloader();
bool init(string url, string region, uint64_t size, uint64_t chunksize,
S3Credential* pcred);
bool get(char* buf, uint64_t& len);
void destroy();
// reset
// init(url)
private:
const uint8_t num;
pthread_t* threads;
......@@ -95,6 +110,16 @@ struct Downloader {
OffsetMgr* o;
uint64_t chunkcount;
uint64_t readlen;
unsigned char magic_bytes[4];
uint8_t magic_bytes_num;
compression_type_t compression;
bool set_compression();
bool plain_get(char* buf, uint64_t& len);
struct zstream_info* z_info;
bool zstream_get(char* buf, uint64_t& len);
};
struct Bufinfo {
......
......@@ -242,14 +242,12 @@ void S3DwonloadTest(const char *url, const char *region, uint64_t file_size,
}
TEST(S3Downloader, simple) {
printf("Try downloading data0014\n");
S3DwonloadTest(
"http://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/small17/"
"data0014",
"us-west-2", 4420346, "68c4a63b721e7af0ae945ce109ca87ad", 4,
1024 * 1024, 65536);
printf("Try downloading data0016\n");
S3DwonloadTest(
"http://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/small17/"
"data0016",
......@@ -258,27 +256,29 @@ TEST(S3Downloader, simple) {
}
TEST(S3Downloader, httpssimple) {
printf("Try downloading data0014\n");
S3DwonloadTest(
"http://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/small17/"
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/small17/"
"data0014",
"us-west-2", 4420346, "68c4a63b721e7af0ae945ce109ca87ad", 4,
1024 * 1024, 65536);
printf("Try downloading data0016\n");
S3DwonloadTest(
"http://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/small17/"
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/small17/"
"data0016",
"us-west-2", 2536018, "0fd502a303eb8f138f5916ec357721b1", 4,
1024 * 1024, 65536);
}
TEST(S3Downloader, gzipped) {
S3DwonloadTest(
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/gzipped/"
"data0001.gz",
"us-west-2", 328950, "b958fb80b98605a6095e6ebc4b9b4786", 3, 1024 * 1024,
65536);
}
#ifdef BIG_FILE_TEST
TEST(S3Downloader, bigfile) {
// InitLog();
// s3ext_loglevel = EXT_DEBUG;
// s3ext_logtype = STDERR_LOG;
printf("Try downloading big file 1\n");
S3DwonloadTest(
"http://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/hugefile/"
"airlinedata1.csv",
......
......@@ -224,6 +224,19 @@ TEST(ExtWrapper, normal_2segs) {
64 * 1024, "db05de0ec7e0808268e2363d3572dc7f", 1, 2, 64 * 1024 * 1024);
}
TEST(ExtWrapper, gzipped) {
ExtWrapperTest(
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/gzipped/",
64 * 1024, "7b2260e9a3a3f26e84aa28dc2124f68f", 0, 1, 123 * 1024);
}
TEST(ExtWrapper, gzipped_normal1) {
ExtWrapperTest(
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/"
"gzipped_normal1/",
64 * 1024, "eacb7b210d3f7703ee06d16f520b103e", 0, 1, 64 * 1024 * 1024);
}
#ifdef BIG_FILE_TEST
TEST(ExtWrapper, normal_3segs) {
ExtWrapperTest(
......@@ -277,6 +290,13 @@ TEST(ExtWrapper, normal2_3segs) {
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/normal/",
64 * 1024, "00675684b6d6697571f22baaf407c6df", 2, 3, 64 * 1024 * 1024);
}
TEST(ExtWrapper, gzipped_normal2) {
ExtWrapperTest(
"https://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/"
"gzipped_normal2/",
64 * 1024, "a930794bc885bccf6eed45bd40367a7d", 0, 1, 64 * 1024 * 1024);
}
#endif
#endif // AWS_TEST
......
......@@ -15,7 +15,6 @@ CREATE PROTOCOL s3 (
select * from pg_extprotocol;
drop external table s3example;
-- create READABLE external table s3example (Year text, Month text, DayofMonth text, DayOfWeek text, DepTime text, CRSDepTime text, ArrTime text,CRSArrTime text, UniqueCarrier text, FlightNum text,TailNum text, ActualElapsedTime text, CRSElapsedTime text, AirTime text, ArrDelay text, DepDelay text, Origin text, Dest text, Distance text, TaxiIn text, TaxiOut text, Cancelled text, CancellationCode text, Diverted text, CarrierDelay text, WeatherDelay text, NASDelay text, SecurityDelay text, LateAircraftDelay text) location('s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/hugefile config=/home/gpadmin/s3.conf') format 'csv' LOG ERRORS SEGMENT REJECT LIMIT 100 PERCENT;
create READABLE external table s3example (Year text, Month text, DayofMonth text, DayOfWeek text, DepTime text, CRSDepTime text, ArrTime text,CRSArrTime text, UniqueCarrier text, FlightNum text,TailNum text, ActualElapsedTime text, CRSElapsedTime text, AirTime text, ArrDelay text, DepDelay text, Origin text, Dest text, Distance text, TaxiIn text, TaxiOut text, Cancelled text, CancellationCode text, Diverted text, CarrierDelay text, WeatherDelay text, NASDelay text, SecurityDelay text, LateAircraftDelay text) location('s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/hugefile_correct_data/ config=/home/gpadmin/s3.conf') format 'csv' SEGMENT REJECT LIMIT 100 PERCENT;
SELECT count(*) FROM s3example;
......
-- ========
-- PROTOCOL
-- ========
-- create the database functions
CREATE OR REPLACE FUNCTION read_from_s3() RETURNS integer AS
'$libdir/gps3ext.so', 's3_import' LANGUAGE C STABLE;
-- declare the protocol name along with in/out funcs
CREATE PROTOCOL s3 (
readfunc = read_from_s3
);
-- Check out the catalog table
select * from pg_extprotocol;
drop external table s3example;
create READABLE external table s3example (date text, time text, open float, high float,
low float, volume int) location('s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset1/gzipped_normal1/ config=/home/gpadmin/s3.conf') FORMAT 'csv';
SELECT count(*) FROM s3example;
SELECT sum(open) FROM s3example;
SELECT avg(open) FROM s3example;
drop external table s3example;
create READABLE external table s3example (Year text, Month text, DayofMonth text, DayOfWeek text, DepTime text, CRSDepTime text, ArrTime text,CRSArrTime text, UniqueCarrier text, FlightNum text,TailNum text, ActualElapsedTime text, CRSElapsedTime text, AirTime text, ArrDelay text, DepDelay text, Origin text, Dest text, Distance text, TaxiIn text, TaxiOut text, Cancelled text, CancellationCode text, Diverted text, CarrierDelay text, WeatherDelay text, NASDelay text, SecurityDelay text, LateAircraftDelay text) location('s3://s3-us-west-2.amazonaws.com/s3test.pivotal.io/dataset2/gzipped_normal2/ config=/home/gpadmin/s3.conf') format 'csv' SEGMENT REJECT LIMIT 100 PERCENT;
SELECT count(*) FROM s3example;
-- =======
-- CLEANUP
-- =======
DROP EXTERNAL TABLE s3example;
DROP PROTOCOL s3;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册