提交 29becc61 编写于 作者: K Kuien Liu 提交者: Adam Lee

s3ext: able to write a big row larger than chunkbuffer

Refactor compress_writer and s3key_writer to support
row larger than chunk buffer, fix a corner case when
the size of compression output is bigger than input.

Add two regression tests to generate rows with mixed length,
some of rows are larger than the size of chunkbuffer.
Signed-off-by: NPeifeng Qiu <pqiu@pivotal.io>
上级 3a076df1
......@@ -15,8 +15,8 @@ class CompressWriter : public Writer {
virtual void open(const WriterParams &params);
// write() attempts to write up to count bytes from the buffer.
// Always return 0 if EOF, no matter how many times it's invoked. Throw exception if encounters
// errors.
// If 'count' is larger than Zip chunk-buffer, it invokes writeOneChunk()
// repeatedly to finish upload. Throw exception if encounters errors.
virtual uint64_t write(const char *buf, uint64_t count);
// This should be reentrant, has no side effects when called multiple times.
......@@ -26,6 +26,7 @@ class CompressWriter : public Writer {
private:
void flush();
uint64_t writeOneChunk(const char *buf, uint64_t count);
Writer *writer;
......
CREATE WRITABLE EXTERNAL TABLE s3write_bigrow_write (id serial, content text)
LOCATION('s3://s3-us-west-2.amazonaws.com/@write_prefix@/bigrow/ config=@config_file@') FORMAT 'csv';
-- Pangram: 'Pack my box with five dozen liquor jugs.' 40 chars, 32 letters
-- Total 40KB
INSERT INTO s3write_bigrow_write VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 ));
-- Total 40MB
INSERT INTO s3write_bigrow_write VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 * 1024));
-- Total 80MB
INSERT INTO s3write_bigrow_write VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 * 1024 * 2));
-- Total 200MB
-- INSERT INTO s3write_bigrow_write VALUES (DEFAULT,
-- repeat('Pack my box with five dozen liquor jugs.', 1024 * 1024 * 5));
-- Total 320MB, CRASH!! Report on https://github.com/greenplum-db/gpdb/issues/1090
-- INSERT INTO s3write_bigrow_write VALUES (DEFAULT,
-- repeat('Pack my box with five dozen liquor jugs.', 1024 * 1024 * 8));
DROP EXTERNAL TABLE IF EXISTS s3write_bigrow_write;
CREATE TEMP TABLE s3write_mixedlenrows_temp (id serial, content text);
CREATE WRITABLE EXTERNAL TABLE s3write_mixedlenrows_write (id serial, content text)
LOCATION('s3://s3-us-west-2.amazonaws.com/@write_prefix@/mixedlenrows/ config=@config_file@') FORMAT 'csv';
-- PREPARE --
-------------
-- Pangram: 'Pack my box with five dozen liquor jugs.' 40 chars, 32 letters
-- Total 40KB
INSERT INTO s3write_mixedlenrows_temp VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 ));
-- Total 1MB
INSERT INTO s3write_mixedlenrows_temp VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 * 25 ));
-- Total 40MB, bigger than chunksize (default is 16MB in tests)
INSERT INTO s3write_mixedlenrows_temp VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 * 1024));
-- Total 40KB
INSERT INTO s3write_mixedlenrows_temp VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 ));
-- Total 1MB
INSERT INTO s3write_mixedlenrows_temp VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 * 25 ));
-- Total 40KB
INSERT INTO s3write_mixedlenrows_temp VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 ));
-- Total 10MB, smaller than chunksize (default is 16MB in tests)
INSERT INTO s3write_mixedlenrows_temp VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 * 25 * 10));
-- WRITE OUT --
---------------
INSERT INTO s3write_mixedlenrows_write
SELECT * FROM s3write_mixedlenrows_temp;
-- VERIFY IT --
---------------
CREATE READABLE EXTERNAL TABLE s3write_mixedlenrows_read (id serial, content text)
LOCATION('s3://s3-us-west-2.amazonaws.com/@write_prefix@/mixedlenrows/ config=@config_file@') FORMAT 'csv';
SELECT count(*) FROM s3write_mixedlenrows_read;
DROP EXTERNAL TABLE IF EXISTS s3write_mixedlenrows_write;
DROP EXTERNAL TABLE IF EXISTS s3write_mixedlenrows_read;
CREATE WRITABLE EXTERNAL TABLE s3write_bigrow_write (id serial, content text)
LOCATION('s3://s3-us-west-2.amazonaws.com/@write_prefix@/bigrow/ config=@config_file@') FORMAT 'csv';
NOTICE: CREATE EXTERNAL TABLE will create implicit sequence "s3write_bigrow_write_id_seq" for serial column "s3write_bigrow_write.id"
-- Pangram: 'Pack my box with five dozen liquor jugs.' 40 chars, 32 letters
-- Total 40KB
INSERT INTO s3write_bigrow_write VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 ));
-- Total 40MB
INSERT INTO s3write_bigrow_write VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 * 1024));
-- Total 80MB
INSERT INTO s3write_bigrow_write VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 * 1024 * 2));
-- Total 200MB
-- INSERT INTO s3write_bigrow_write VALUES (DEFAULT,
-- repeat('Pack my box with five dozen liquor jugs.', 1024 * 1024 * 5));
-- Total 320MB, CRASH!! Report on https://github.com/greenplum-db/gpdb/issues/1090
-- INSERT INTO s3write_bigrow_write VALUES (DEFAULT,
-- repeat('Pack my box with five dozen liquor jugs.', 1024 * 1024 * 8));
DROP EXTERNAL TABLE IF EXISTS s3write_bigrow_write;
CREATE TEMP TABLE s3write_mixedlenrows_temp (id serial, content text);
NOTICE: CREATE TABLE will create implicit sequence "s3write_mixedlenrows_temp_id_seq" for serial column "s3write_mixedlenrows_temp.id"
NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'id' as the Greenplum Database data distribution key for this table.
HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew.
CREATE WRITABLE EXTERNAL TABLE s3write_mixedlenrows_write (id serial, content text)
LOCATION('s3://s3-us-west-2.amazonaws.com/@write_prefix@/mixedlenrows/ config=@config_file@') FORMAT 'csv';
NOTICE: CREATE EXTERNAL TABLE will create implicit sequence "s3write_mixedlenrows_write_id_seq" for serial column "s3write_mixedlenrows_write.id"
-- PREPARE --
-------------
-- Pangram: 'Pack my box with five dozen liquor jugs.' 40 chars, 32 letters
-- Total 40KB
INSERT INTO s3write_mixedlenrows_temp VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 ));
-- Total 1MB
INSERT INTO s3write_mixedlenrows_temp VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 * 25 ));
-- Total 40MB, bigger than chunksize (default is 16MB in tests)
INSERT INTO s3write_mixedlenrows_temp VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 * 1024));
-- Total 40KB
INSERT INTO s3write_mixedlenrows_temp VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 ));
-- Total 1MB
INSERT INTO s3write_mixedlenrows_temp VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 * 25 ));
-- Total 40KB
INSERT INTO s3write_mixedlenrows_temp VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 ));
-- Total 10MB, smaller than chunksize (default is 16MB in tests)
INSERT INTO s3write_mixedlenrows_temp VALUES (DEFAULT,
repeat('Pack my box with five dozen liquor jugs.', 1024 * 25 * 10));
-- WRITE OUT --
---------------
INSERT INTO s3write_mixedlenrows_write
SELECT * FROM s3write_mixedlenrows_temp;
-- VERIFY IT --
---------------
CREATE READABLE EXTERNAL TABLE s3write_mixedlenrows_read (id serial, content text)
LOCATION('s3://s3-us-west-2.amazonaws.com/@write_prefix@/mixedlenrows/ config=@config_file@') FORMAT 'csv';
NOTICE: CREATE EXTERNAL TABLE will create implicit sequence "s3write_mixedlenrows_read_id_seq" for serial column "s3write_mixedlenrows_read.id"
SELECT count(*) FROM s3write_mixedlenrows_read;
count
-------
7
(1 row)
DROP EXTERNAL TABLE IF EXISTS s3write_mixedlenrows_write;
DROP EXTERNAL TABLE IF EXISTS s3write_mixedlenrows_read;
test: 0_00_prepare_s3_protocol
# tens of seconds
test: 1_01_normal 1_02_log_error 1_10_all_regions 1_11_gzipped_data 1_12_no_prefix 1_13_parallel1 1_13_parallel2 1_09_partition
test: 1_01_normal 1_02_log_error 1_10_all_regions 1_11_gzipped_data 1_12_no_prefix 1_13_parallel1 1_13_parallel2 1_09_partition 3_09_write_big_row 3_10_write_mixed_length_rows
# ~ 1s
test: 1_03_bad_data 1_04_empty_prefix 1_05_one_line 1_06_1correct_1wrong 2_01_invalid_syntax 2_02_invalid_region 2_03_invalid_config 2_04_invalid_header 3_01_create_wet 3_02_insert_quick_shoot_wet 4_01_create_invalid_wet
test: 1_03_bad_data 1_04_empty_prefix 1_05_one_line 1_06_1correct_1wrong 2_01_invalid_syntax 2_02_invalid_region 2_03_invalid_config 2_04_invalid_header 3_01_create_wet 3_02_quick_shoot_wet 4_01_create_invalid_wet
# heavy loads, > 100s
test: 1_07_huge_bad_data 1_08_huge_correct_data 1_14_thousands_of_files 3_03_insert_lots_of_rows 3_07_write_lots_of_files 3_04_insert_mixed_workload 3_05_insert_to_wet_from_ret 3_06_special_characters 3_08_join_query_wet_local_tbl 4_02_wet_with_mixed_format
......
......@@ -37,29 +37,55 @@ void CompressWriter::open(const WriterParams& params) {
this->isClosed = false;
}
uint64_t CompressWriter::write(const char* buf, uint64_t count) {
uint64_t CompressWriter::writeOneChunk(const char* buf, uint64_t count) {
// Defensive code
if (buf == NULL || count == 0) {
return 0;
}
// we assume data block from upper layer is always smaller than gzip chunkbuffer
CHECK_OR_DIE_MSG(count < S3_ZIP_COMPRESS_CHUNKSIZE,
"Data size %" PRIu64 " is larger than S3_ZIP_COMPRESS_CHUNKSIZE", count);
this->zstream.next_in = (Byte*)buf;
this->zstream.avail_in = count;
int status = deflate(&this->zstream, Z_NO_FLUSH);
if (status < 0 && status != Z_BUF_ERROR) {
deflateEnd(&this->zstream);
CHECK_OR_DIE_MSG(false, "Failed to compress data: %d, %s", status, this->zstream.msg);
}
int status;
do {
status = deflate(&this->zstream, Z_NO_FLUSH);
if (status < 0 && status != Z_BUF_ERROR) {
deflateEnd(&this->zstream);
CHECK_OR_DIE_MSG(false, "Failed to compress data: %d, %s", status, this->zstream.msg);
}
this->flush();
this->flush();
// output buffer is same size to input buffer, most cases data
// is smaller after compressed. But if this->zstream.avail_in > 0,
// then data is larger after compressed and some input data is pending.
// For example if compress already compressed data, we will encounter
// this case. So we need to loop here.
} while (status == Z_OK && this->zstream.avail_in > 0);
return count;
}
uint64_t CompressWriter::write(const char* buf, uint64_t count) {
// Defensive code
if (buf == NULL || count == 0) {
return 0;
}
uint64_t ret = 0;
for (uint64_t i = 0; i < count / S3_ZIP_COMPRESS_CHUNKSIZE; i++) {
ret += this->writeOneChunk(buf + ret, S3_ZIP_COMPRESS_CHUNKSIZE);
}
if (ret < count) {
ret += this->writeOneChunk(buf + ret, count - ret);
}
return ret;
}
void CompressWriter::close() {
if (this->isClosed) {
return;
......
......@@ -20,25 +20,26 @@ void S3KeyWriter::open(const WriterParams& params) {
S3DEBUG("key: %s, upload id: %s", this->url.c_str(), this->uploadId.c_str());
}
// write() attempts to write up to count bytes from the buffer.
// Always return 0 if EOF, no matter how many times it's invoked.
// Throw exception if encounters errors.
// write() first fills up the data buffer before flush it out
uint64_t S3KeyWriter::write(const char* buf, uint64_t count) {
// Defensive code
CHECK_OR_DIE(buf != NULL);
this->checkQueryCancelSignal();
// GPDB issues 64K- block every time and chunkSize is 8MB+
if (count > this->chunkSize) {
S3ERROR("%" PRIu64 " is larger than chunkSize %" PRIu64, count, this->chunkSize);
CHECK_OR_DIE_MSG(false, "%" PRIu64 " is larger than chunkSize %" PRIu64, count,
this->chunkSize);
}
uint64_t offset = 0;
while (offset < count) {
uint64_t bufferRemain = this->chunkSize - this->buffer.size();
uint64_t dataRemain = count - offset;
uint64_t dataToBuffer = bufferRemain < dataRemain ? bufferRemain : dataRemain;
if ((this->buffer.size() + count) > this->chunkSize) {
flushBuffer();
}
this->buffer.insert(this->buffer.end(), buf + offset, buf + offset + dataToBuffer);
this->buffer.insert(this->buffer.end(), buf, buf + count);
if (this->buffer.size() == this->chunkSize) {
this->flushBuffer();
}
offset += dataToBuffer;
}
return count;
}
......@@ -165,6 +166,7 @@ void S3KeyWriter::completeKeyWriting() {
S3DEBUG("Segment %d has finished uploading \"%s\"", s3ext_segid, this->url.c_str());
this->buffer.clear();
this->etagList.clear();
this->uploadId.clear();
}
#include <string>
#include <vector>
#include "compress_writer.cpp"
......@@ -5,6 +6,7 @@
#include "s3macros.h"
using std::vector;
using std::string;
class MockWriter : public Writer {
public:
......@@ -60,6 +62,10 @@ class CompressWriterTest : public testing::Test {
}
void simpleUncompress(const char *input, uint64_t len) {
this->coreUncompress((Byte *)input, len, this->out, S3_ZIP_COMPRESS_CHUNKSIZE);
}
void coreUncompress(Byte *input, uint64_t len, Byte *output, uint64_t out_len) {
z_stream zstream;
// allocate inflate state for zlib
......@@ -71,9 +77,9 @@ class CompressWriterTest : public testing::Test {
CHECK_OR_DIE_MSG(ret == Z_OK, "%s", "failed to initialize zlib library");
zstream.avail_in = len;
zstream.next_in = (Byte *)input;
zstream.next_out = this->out;
zstream.avail_out = S3_ZIP_DECOMPRESS_CHUNKSIZE;
zstream.next_in = input;
zstream.next_out = output;
zstream.avail_out = out_len;
ret = inflate(&zstream, Z_FULL_FLUSH);
......@@ -141,6 +147,7 @@ TEST_F(CompressWriterTest, AbleToCompressOneSmallString) {
TEST_F(CompressWriterTest, AbleToWriteServeralTimesBeforeClose) {
unsigned int i, times = 100;
// 44 chars
const char input[] = "The quick brown fox jumps over the lazy dog";
for (i = 0; i < times; i++) {
......@@ -157,7 +164,56 @@ TEST_F(CompressWriterTest, AbleToWriteServeralTimesBeforeClose) {
}
TEST_F(CompressWriterTest, AbleToWriteLargerThanCompressChunkSize) {
char *input = new char[S3_ZIP_COMPRESS_CHUNKSIZE + 1];
const char pangram[] = "The quick brown fox jumps over the lazy dog";
unsigned int times = S3_ZIP_COMPRESS_CHUNKSIZE / (sizeof(pangram) - 1) + 1;
string input;
for (unsigned int i = 0; i < times; i++) input.append(pangram);
compressWriter.write(input.c_str(), input.length());
compressWriter.close();
Byte *result = new Byte[input.length() + 1];
this->coreUncompress((Byte *)writer.getRawData(), writer.getDataSize(), result,
input.length() + 1);
EXPECT_TRUE(memcmp(input.c_str(), result, input.length()) == 0);
delete result;
}
#include <memory>
#include <random>
// Compress compressed data may generate larger output than input after GZIP compression.
TEST_F(CompressWriterTest, CompressCompressedData) {
std::random_device rd;
std::default_random_engine re(rd());
// 25 is an empirical number to trigger the corner case.
size_t dataLen = S3_ZIP_COMPRESS_CHUNKSIZE * 25;
size_t charLen = dataLen * 4;
unsigned int *data = new unsigned int[dataLen];
for (size_t i = 0; i < dataLen; i += 4) {
data[i] = re();
}
compressWriter.write((const char *)data, charLen);
compressWriter.close();
vector<char> compressedData;
for (size_t i = 0; i < 3; i++) {
compressWriter.open(this->params);
compressedData.swap(writer.getRawDataVector());
writer.getRawDataVector().clear();
compressWriter.write((const char *)compressedData.data(), compressedData.size());
compressWriter.close();
}
std::unique_ptr<Byte> result(new Byte[compressedData.size()]);
this->coreUncompress((Byte *)writer.getRawData(), writer.getDataSize(), result.get(),
compressedData.size());
EXPECT_THROW(compressWriter.write(input, S3_ZIP_COMPRESS_CHUNKSIZE + 1), std::runtime_error);
EXPECT_TRUE(memcmp(compressedData.data(), result.get(), compressedData.size()) == 0);
}
......@@ -84,13 +84,18 @@ TEST_F(S3KeyWriterTest, TestSmallWrite) {
this->close();
}
TEST_F(S3KeyWriterTest, TestSmallChunkSize) {
TEST_F(S3KeyWriterTest, TestChunkSizeSmallerThanInput) {
testParams.setChunkSize(0x100);
EXPECT_CALL(mocks3interface, getUploadId(_, _, _)).WillOnce(Return("uploadId"));
EXPECT_CALL(mocks3interface, uploadPartOfData(_, _, _, _, _, _))
.WillOnce(Return("\"etag1\""))
.WillOnce(Return("\"etag2\""));
EXPECT_CALL(this->mocks3interface, completeMultiPart(_, _, _, _, _)).WillOnce(Return(true));
char data[0x101];
this->open(testParams);
EXPECT_THROW(this->write(data, sizeof(data)), std::runtime_error);
EXPECT_EQ(sizeof(data), this->write(data, sizeof(data)));
this->close();
}
class MockUploadPartOfData {
......@@ -118,9 +123,9 @@ TEST_F(S3KeyWriterTest, TestBufferedWrite) {
char data[0x100];
EXPECT_CALL(this->mocks3interface, getUploadId(_, _, _)).WillOnce(Return("uploadid1"));
EXPECT_CALL(this->mocks3interface, uploadPartOfData(_, _, _, _, 1, "uploadid1"))
.WillOnce(Invoke(MockUploadPartOfData(0x80)));
.WillOnce(Invoke(MockUploadPartOfData(0x100)));
EXPECT_CALL(this->mocks3interface, uploadPartOfData(_, _, _, _, 2, "uploadid1"))
.WillOnce(Invoke(MockUploadPartOfData(0x81)));
.WillOnce(Invoke(MockUploadPartOfData(0x1)));
EXPECT_CALL(this->mocks3interface, completeMultiPart(_, _, _, _, _)).WillOnce(Return(true));
this->open(testParams);
......@@ -172,7 +177,7 @@ TEST_F(S3KeyWriterTest, TestWriteAbortInClosing) {
char data[0x100];
EXPECT_CALL(this->mocks3interface, getUploadId(_, _, _)).WillOnce(Return("uploadid1"));
EXPECT_CALL(this->mocks3interface, uploadPartOfData(_, _, _, _, 1, "uploadid1"))
.WillOnce(Invoke(MockUploadPartOfData(0x80)));
.WillOnce(Invoke(MockUploadPartOfData(0x100)));
EXPECT_CALL(this->mocks3interface, abortUpload(_, _, _, _)).WillOnce(Return(true));
this->open(testParams);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册