提交 3b77ea00 编写于 作者: A Adam Lee

s3ext: support files without EOL at the end of them

Append an EOL if files don't have it, otherwise the last line will be
combined with first line of next file.
Signed-off-by: NNing Wu <nwu@pivotal.io>
Signed-off-by: NYuan Zhao <yuzhao@pivotal.io>
上级 d5d055b0
#include "gpcheckcloud.h"
bool hasHeader;
char eolString[EOL_CHARS_MAX_LEN + 1];
char eolString[EOL_CHARS_MAX_LEN + 1] = ""; // meaningless for gpcheckcloud
string s3extErrorMessage;
......
......@@ -73,7 +73,9 @@ class S3KeyReader : public Reader {
numOfChunks(0),
curReadingChunk(0),
transferredKeyLen(0),
s3Interface(NULL) {
s3Interface(NULL),
hasEol(false),
eolAppended(false) {
pthread_mutex_init(&this->mutexErrorMessage, NULL);
}
virtual ~S3KeyReader() {
......@@ -159,6 +161,9 @@ class S3KeyReader : public Reader {
S3Interface* s3Interface;
void reset();
bool hasEol;
bool eolAppended;
};
class ChunkBuffer {
......
CREATE READABLE EXTERNAL TABLE s3regress_no_eol_at_eof (t int)
LOCATION('s3://s3-us-west-2.amazonaws.com/@read_prefix@/no_eol_at_eof/ config=@config_file@') FORMAT 'csv';
SELECT count(*) FROM s3regress_no_eol_at_eof;
DROP EXTERNAL TABLE s3regress_no_eol_at_eof;
CREATE READABLE EXTERNAL TABLE s3regress_no_eol_at_eof (t int)
LOCATION('s3://s3-us-west-2.amazonaws.com/@read_prefix@/no_eol_at_eof/ config=@config_file@') FORMAT 'csv';
SELECT count(*) FROM s3regress_no_eol_at_eof;
count
-------
36
(1 row)
DROP EXTERNAL TABLE s3regress_no_eol_at_eof;
test: 0_00_prepare_s3_protocol
# ~ 1s
test: 1_03_bad_data 1_04_empty_prefix 1_05_one_line 1_06_1correct_1wrong 2_02_invalid_region 2_03_invalid_config 2_04_invalid_header 2_05_limit_zero 3_01_create_wet 3_02_quick_shoot_wet 4_01_create_invalid_wet 2_06_invalid_sub_query
test: 1_03_bad_data 1_04_empty_prefix 1_05_one_line 1_06_1correct_1wrong 2_02_invalid_region 2_03_invalid_config 2_04_invalid_header 2_05_limit_zero 3_01_create_wet 3_02_quick_shoot_wet 4_01_create_invalid_wet 2_06_invalid_sub_query 1_17_no_eol_at_eof
# tens of seconds
test: 1_01_normal 1_02_log_error 1_10_all_regions 1_11_gzipped_data 1_12_no_prefix 1_13_parallel1 1_13_parallel2 1_09_partition 3_09_write_big_row 3_10_write_mixed_length_rows 1_15_normal_sub_query 1_16_multiple_files_with_header_line
......
......@@ -113,6 +113,7 @@ static const char *getFormatStr(FunctionCallInfo fcinfo) {
}
bool hasHeader = false;
char eolString[EOL_CHARS_MAX_LEN + 1] = "\n"; // LF by default
static void parseFormatOpts(FunctionCallInfo fcinfo) {
......@@ -124,10 +125,11 @@ static void parseFormatOpts(FunctionCallInfo fcinfo) {
// only TEXT and CSV have detailed options
if (fmttype_is_csv(fmtcode) || fmttype_is_text(fmtcode)) {
if (strstr(fmtopts, "header") != NULL)
if (strstr(fmtopts, "header") != NULL) {
hasHeader = true;
else
} else {
hasHeader = false;
}
// detect end-of-line terminator
const char *newline_str = strstr(fmtopts, "newline");
......
......@@ -221,6 +221,15 @@ uint64_t S3KeyReader::read(char* buf, uint64_t count) {
do {
// confirm there is no more available data, done with this file
if (this->transferredKeyLen >= fileLen) {
if (!this->hasEol && !this->eolAppended) {
uint64_t eolLen = strlen(eolString);
strncpy(buf, eolString, eolLen);
this->eolAppended = true;
return eolLen;
}
return 0;
}
......@@ -237,15 +246,19 @@ uint64_t S3KeyReader::read(char* buf, uint64_t count) {
}
this->transferredKeyLen += readLen;
if (this->transferredKeyLen == fileLen) {
if (buf[readLen - 1] == '\r' || buf[readLen - 1] == '\n') {
this->hasEol = true;
}
}
if (readLen < count) {
this->curReadingChunk++;
}
count -= readLen;
// retry to confirm whether thread reading is finished or chunk size is
// divisible by get()'s buffer size
} while (readLen == 0);
} while (readLen == 0); // retry to confirm whether thread reading is finished or chunk size is
// divisible by get()'s buffer size
return readLen;
}
......@@ -260,6 +273,9 @@ void S3KeyReader::reset() {
this->chunkBuffers.clear();
this->threads.clear();
this->hasEol = false;
this->eolAppended = false;
}
void S3KeyReader::close() {
......
......@@ -25,9 +25,12 @@ class MockGPReader : public GPReader {
class GPReaderTest : public testing::Test {
protected:
virtual void SetUp() {
eolString[0] = '\0';
InitConfig(this->params, "data/s3test.conf", "default");
}
virtual void TearDown() {
eolString[0] = '\n';
eolString[1] = '\0';
}
S3Params params;
......
......@@ -31,6 +31,9 @@ class S3BucketReaderTest : public testing::Test {
// TearDown() is invoked immediately after a test finishes.
virtual void TearDown() {
eolString[0] = '\n';
eolString[1] = '\0';
delete bucketReader;
s3ext_segid = 0;
......
......@@ -11,7 +11,8 @@ using ::testing::Throw;
using ::testing::_;
bool hasHeader = false;
char eolString[EOL_CHARS_MAX_LEN + 1] = "\n";
char eolString[EOL_CHARS_MAX_LEN + 1] = "\n"; // LF by default
string s3extErrorMessage;
......@@ -38,13 +39,20 @@ class S3KeyReaderTest : public testing::Test, public S3KeyReader {
protected:
// Remember that SetUp() is run immediately before a test starts.
virtual void SetUp() {
memset(buffer, 0, 256);
eolString[0] = '\n';
eolString[1] = '\0';
QueryCancelPending = false;
this->setS3InterfaceService(&s3Interface);
}
// TearDown() is invoked immediately after a test finishes.
virtual void TearDown() {
this->close();
QueryCancelPending = false;
}
......@@ -250,6 +258,7 @@ TEST_F(S3KeyReaderTest, ReadWithSingleChunk) {
this->open(params);
EXPECT_EQ((uint64_t)255, this->read(buffer, 64 * 1024));
EXPECT_EQ((uint64_t)1, this->read(buffer, 64 * 1024));
EXPECT_EQ((uint64_t)0, this->read(buffer, 64 * 1024));
}
......@@ -276,6 +285,7 @@ TEST_F(S3KeyReaderTest, ReadWithSingleChunkNormalCase) {
EXPECT_EQ((uint64_t)32, this->read(buffer, 32));
EXPECT_EQ((uint64_t)32, this->read(buffer, 32));
EXPECT_EQ((uint64_t)31, this->read(buffer, 32));
EXPECT_EQ((uint64_t)1, this->read(buffer, 32));
EXPECT_EQ((uint64_t)0, this->read(buffer, 32));
}
......@@ -293,6 +303,7 @@ TEST_F(S3KeyReaderTest, ReadWithSmallBuffer) {
EXPECT_EQ((uint64_t)64, this->read(buffer, 64));
EXPECT_EQ((uint64_t)64, this->read(buffer, 64));
EXPECT_EQ((uint64_t)63, this->read(buffer, 64));
EXPECT_EQ((uint64_t)1, this->read(buffer, 64));
EXPECT_EQ((uint64_t)0, this->read(buffer, 64));
}
......@@ -326,6 +337,7 @@ TEST_F(S3KeyReaderTest, ResetByInvokingClose) {
EXPECT_EQ((uint64_t)64, this->read(buffer, 64));
EXPECT_EQ((uint64_t)64, this->read(buffer, 64));
EXPECT_EQ((uint64_t)63, this->read(buffer, 64));
EXPECT_EQ((uint64_t)1, this->read(buffer, 64));
EXPECT_EQ((uint64_t)0, this->read(buffer, 64));
this->close();
......@@ -349,6 +361,7 @@ TEST_F(S3KeyReaderTest, ReadWithSmallKeySize) {
this->open(params);
EXPECT_EQ((uint64_t)2, this->read(buffer, 64));
EXPECT_EQ((uint64_t)1, this->read(buffer, 64));
EXPECT_EQ((uint64_t)0, this->read(buffer, 64));
}
......@@ -370,6 +383,7 @@ TEST_F(S3KeyReaderTest, ReadWithSmallChunk) {
EXPECT_EQ((uint64_t)64, this->read(buffer, 64));
EXPECT_EQ((uint64_t)64, this->read(buffer, 64));
EXPECT_EQ((uint64_t)63, this->read(buffer, 64));
EXPECT_EQ((uint64_t)1, this->read(buffer, 64));
EXPECT_EQ((uint64_t)0, this->read(buffer, 64));
}
......@@ -391,6 +405,7 @@ TEST_F(S3KeyReaderTest, ReadWithSmallChunkDividedKeySize) {
EXPECT_EQ((uint64_t)64, this->read(buffer, 64));
EXPECT_EQ((uint64_t)64, this->read(buffer, 64));
EXPECT_EQ((uint64_t)64, this->read(buffer, 64));
EXPECT_EQ((uint64_t)1, this->read(buffer, 64));
EXPECT_EQ((uint64_t)0, this->read(buffer, 64));
}
......@@ -405,6 +420,7 @@ TEST_F(S3KeyReaderTest, ReadWithChunkLargerThanReadBufferAndKeySize) {
this->open(params);
EXPECT_EQ((uint64_t)255, this->read(buffer, 255));
EXPECT_EQ((uint64_t)1, this->read(buffer, 255));
EXPECT_EQ((uint64_t)0, this->read(buffer, 255));
}
......@@ -428,6 +444,7 @@ TEST_F(S3KeyReaderTest, ReadWithKeyLargerThanChunkSize) {
EXPECT_EQ((uint64_t)255, this->read(buffer, 255));
EXPECT_EQ((uint64_t)255, this->read(buffer, 255));
EXPECT_EQ((uint64_t)4, this->read(buffer, 255));
EXPECT_EQ((uint64_t)1, this->read(buffer, 255));
EXPECT_EQ((uint64_t)0, this->read(buffer, 255));
}
......@@ -442,6 +459,7 @@ TEST_F(S3KeyReaderTest, ReadWithSameKeyChunkReadSize) {
this->open(params);
EXPECT_EQ((uint64_t)255, this->read(buffer, 255));
EXPECT_EQ((uint64_t)1, this->read(buffer, 255));
EXPECT_EQ((uint64_t)0, this->read(buffer, 255));
}
......@@ -466,6 +484,7 @@ TEST_F(S3KeyReaderTest, MTReadWith2Chunks) {
EXPECT_EQ((uint64_t)32, this->read(buffer, 32));
EXPECT_EQ((uint64_t)32, this->read(buffer, 32));
EXPECT_EQ((uint64_t)31, this->read(buffer, 32));
EXPECT_EQ((uint64_t)1, this->read(buffer, 32));
EXPECT_EQ((uint64_t)0, this->read(buffer, 32));
}
......@@ -490,6 +509,7 @@ TEST_F(S3KeyReaderTest, MTReadWithRedundantChunks) {
EXPECT_EQ((uint64_t)32, this->read(buffer, 32));
EXPECT_EQ((uint64_t)32, this->read(buffer, 32));
EXPECT_EQ((uint64_t)31, this->read(buffer, 32));
EXPECT_EQ((uint64_t)1, this->read(buffer, 32));
EXPECT_EQ((uint64_t)0, this->read(buffer, 32));
}
......@@ -514,6 +534,7 @@ TEST_F(S3KeyReaderTest, MTReadWithReusedAndUnreusedChunks) {
EXPECT_EQ((uint64_t)32, this->read(buffer, 32));
EXPECT_EQ((uint64_t)32, this->read(buffer, 32));
EXPECT_EQ((uint64_t)31, this->read(buffer, 32));
EXPECT_EQ((uint64_t)1, this->read(buffer, 32));
EXPECT_EQ((uint64_t)0, this->read(buffer, 32));
}
......@@ -534,6 +555,7 @@ TEST_F(S3KeyReaderTest, MTReadWithChunksSmallerThanReadBuffer) {
EXPECT_EQ((uint64_t)64, this->read(buffer, 127));
EXPECT_EQ((uint64_t)64, this->read(buffer, 127));
EXPECT_EQ((uint64_t)63, this->read(buffer, 127));
EXPECT_EQ((uint64_t)1, this->read(buffer, 127));
EXPECT_EQ((uint64_t)0, this->read(buffer, 127));
}
......@@ -562,6 +584,7 @@ TEST_F(S3KeyReaderTest, MTReadWithFragmentalReadRequests) {
EXPECT_EQ((uint64_t)31, this->read(buffer, 31));
EXPECT_EQ((uint64_t)31, this->read(buffer, 31));
EXPECT_EQ((uint64_t)1, this->read(buffer, 31));
EXPECT_EQ((uint64_t)1, this->read(buffer, 31));
EXPECT_EQ((uint64_t)0, this->read(buffer, 31));
}
......@@ -624,6 +647,7 @@ TEST_F(S3KeyReaderTest, MTReadWithHundredsOfThreads) {
EXPECT_EQ((uint64_t)31, this->read(buffer, 31));
EXPECT_EQ((uint64_t)31, this->read(buffer, 31));
EXPECT_EQ((uint64_t)2, this->read(buffer, 31));
EXPECT_EQ((uint64_t)1, this->read(buffer, 31));
EXPECT_EQ((uint64_t)0, this->read(buffer, 31));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册