提交 ef524f69 编写于 作者: H Hongze Cheng

more code

上级 13229b0e
......@@ -116,6 +116,19 @@ static FORCE_INLINE int32_t tarray2_make_room( //
#define TARRAY2_APPEND(a, e) TARRAY2_INSERT(a, (a)->size, e)
#define TARRAY2_APPEND_PTR(a, ep) TARRAY2_APPEND(a, *(ep))
#define TARRAY2_APPEND_BATCH(a, ep, n) \
({ \
int32_t __ret = 0; \
if ((a)->size + (n) > (a)->capacity) { \
__ret = tarray2_make_room((a), (a)->size + (n), sizeof(typeof((a)->data[0]))); \
} \
if (!__ret) { \
memcpy((a)->data + (a)->size, (ep), sizeof(typeof((a)->data[0])) * (n)); \
(a)->size += (n); \
} \
__ret; \
})
// return (TYPE *)
#define TARRAY2_SEARCH(a, ep, cmp, flag) \
({ \
......
......@@ -71,7 +71,7 @@ struct SSttFileWriterConfig {
int32_t maxRow;
int32_t szPage;
int8_t cmprAlg;
int64_t compactVersion; // compact version
int64_t compactVersion;
STFile file;
SSkmInfo *skmTb;
SSkmInfo *skmRow;
......
......@@ -47,13 +47,15 @@ typedef union {
} STombBlock;
typedef struct {
int32_t numRec;
int32_t size[TOMB_RECORD_ELEM_NUM];
SFDataPtr dp[1];
TABLEID minTbid;
TABLEID maxTbid;
int64_t minVer;
int64_t maxVer;
SFDataPtr dp[1];
int32_t numRec;
int32_t size[TOMB_RECORD_ELEM_NUM];
int8_t cmprAlg;
int8_t rsvd[7];
} STombBlk;
#define TOMB_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid)
......@@ -94,13 +96,15 @@ typedef union {
} STbStatisBlock;
typedef struct {
int32_t numRec;
int32_t size[STATIS_RECORD_NUM_ELEM];
SFDataPtr dp[1];
TABLEID minTbid;
TABLEID maxTbid;
int64_t minVer;
int64_t maxVer;
SFDataPtr dp[1];
int32_t numRec;
int32_t size[STATIS_RECORD_NUM_ELEM];
int8_t cmprAlg;
int8_t rsvd[7];
} SStatisBlk;
#define STATIS_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid)
......
......@@ -69,9 +69,9 @@ _exit:
static int32_t tsdbSttSegReaderClose(SSttSegReader **reader) {
if (reader[0]) {
TARRAY2_DESTROY(reader[0]->sttBlkArray, NULL);
TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL);
TARRAY2_DESTROY(reader[0]->statisBlkArray, NULL);
TARRAY2_DESTROY(reader[0]->sttBlkArray, NULL);
taosMemoryFree(reader[0]);
reader[0] = NULL;
}
......@@ -86,11 +86,20 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con
if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
reader[0]->config[0] = config[0];
if (!reader[0]->config->bufArr) reader[0]->config->bufArr = reader[0]->bufArr;
if (reader[0]->config->bufArr == NULL) {
reader[0]->config->bufArr = reader[0]->bufArr;
}
// open file
code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd);
TSDB_CHECK_CODE(code, lino, _exit);
if (fname) {
code = tsdbOpenFile(fname, config->szPage, TD_FILE_READ, &reader[0]->fd);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
char fname1[TSDB_FILENAME_LEN];
tsdbTFileName(config->tsdb, config->file, fname1);
code = tsdbOpenFile(fname1, config->szPage, TD_FILE_READ, &reader[0]->fd);
TSDB_CHECK_CODE(code, lino, _exit);
}
// open each segment reader
int64_t size = config->file->size;
......@@ -256,14 +265,12 @@ int32_t tsdbSttFileReadTombBlock(SSttSegReader *reader, const STombBlk *tombBlk,
int64_t size = 0;
for (int32_t i = 0; i < ARRAY_SIZE(dData->dataArr); ++i) {
code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT,
TWO_STAGE_COMP, &reader->reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec,
tombBlk->cmprAlg, &reader->reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec,
&reader->reader->config->bufArr[2]);
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t j = 0; j < tombBlk->numRec; ++j) {
code = TARRAY2_APPEND(&dData->dataArr[i], ((int64_t *)(reader->reader->config->bufArr[1]))[j]);
continue;
}
code = TARRAY2_APPEND_BATCH(&dData->dataArr[i], reader->reader->config->bufArr[1], tombBlk->numRec);
TSDB_CHECK_CODE(code, lino, _exit);
size += tombBlk->size[i];
}
......@@ -292,14 +299,12 @@ int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *stat
int64_t size = 0;
for (int32_t i = 0; i < ARRAY_SIZE(sData->dataArr); ++i) {
code = tsdbDecmprData(reader->reader->config->bufArr[0] + size, statisBlk->size[i], TSDB_DATA_TYPE_BIGINT,
TWO_STAGE_COMP, &reader->reader->config->bufArr[1], sizeof(int64_t) * statisBlk->numRec,
statisBlk->cmprAlg, &reader->reader->config->bufArr[1], sizeof(int64_t) * statisBlk->numRec,
&reader->reader->config->bufArr[2]);
TSDB_CHECK_CODE(code, lino, _exit);
for (int32_t j = 0; j < statisBlk->numRec; ++j) {
code = TARRAY2_APPEND(sData->dataArr + i, ((int64_t *)reader->reader->config->bufArr[1])[j]);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = TARRAY2_APPEND_BATCH(sData->dataArr + i, reader->reader->config->bufArr[1], statisBlk->numRec);
TSDB_CHECK_CODE(code, lino, _exit);
size += statisBlk->size[i];
}
......@@ -333,7 +338,6 @@ struct SSttFileWriter {
// helper data
SSkmInfo skmTb[1];
SSkmInfo skmRow[1];
int32_t sizeArr[5];
uint8_t *bufArr[5];
STsdbFD *fd;
};
......@@ -362,18 +366,19 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) {
if (sttBlk->maxVer < writer->bData->aVersion[iRow]) sttBlk->maxVer = writer->bData->aVersion[iRow];
}
code = tCmprBlockData(writer->bData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, writer->sizeArr);
int32_t sizeArr[5] = {0};
code = tCmprBlockData(writer->bData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, sizeArr);
TSDB_CHECK_CODE(code, lino, _exit);
sttBlk->bInfo.offset = writer->file->size;
sttBlk->bInfo.szKey = writer->sizeArr[2] + writer->sizeArr[3];
sttBlk->bInfo.szBlock = writer->sizeArr[0] + writer->sizeArr[1] + sttBlk->bInfo.szKey;
sttBlk->bInfo.szKey = sizeArr[2] + sizeArr[3];
sttBlk->bInfo.szBlock = sizeArr[0] + sizeArr[1] + sttBlk->bInfo.szKey;
for (int32_t i = 3; i >= 0; i--) {
if (writer->sizeArr[i]) {
code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[i], writer->sizeArr[i]);
if (sizeArr[i]) {
code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->bufArr[i], sizeArr[i]);
TSDB_CHECK_CODE(code, lino, _exit);
writer->file->size += writer->sizeArr[i];
writer->file->size += sizeArr[i];
}
}
......@@ -390,13 +395,17 @@ _exit:
}
static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
if (STATIS_BLOCK_SIZE(writer->sData)) return 0;
if (STATIS_BLOCK_SIZE(writer->sData) == 0) return 0;
int32_t code = 0;
int32_t lino = 0;
SStatisBlk statisBlk[1] = {{
.numRec = STATIS_BLOCK_SIZE(writer->sData),
.dp[0] =
{
.offset = writer->file->size,
.size = 0,
},
.minTbid =
{
.suid = TARRAY2_FIRST(writer->sData->suid),
......@@ -409,6 +418,8 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
},
.minVer = TARRAY2_FIRST(writer->sData->minVer),
.maxVer = TARRAY2_FIRST(writer->sData->maxVer),
.numRec = STATIS_BLOCK_SIZE(writer->sData),
.cmprAlg = writer->config->cmprAlg,
}};
for (int32_t i = 1; i < STATIS_BLOCK_SIZE(writer->sData); i++) {
......@@ -416,13 +427,10 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
statisBlk->maxVer = TMAX(statisBlk->maxVer, TARRAY2_GET(writer->sData->maxVer, i));
}
statisBlk->dp->offset = writer->file->size;
statisBlk->dp->size = 0;
for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; i++) {
int32_t size;
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(writer->sData->dataArr + i),
TARRAY2_DATA_LEN(&writer->sData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP,
TARRAY2_DATA_LEN(&writer->sData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, statisBlk->cmprAlg,
&writer->config->bufArr[0], 0, &size, &writer->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -453,7 +461,11 @@ static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) {
int32_t lino = 0;
STombBlk tombBlk[1] = {{
.numRec = TOMB_BLOCK_SIZE(writer->tData),
.dp[0] =
{
.offset = writer->file->size,
.size = 0,
},
.minTbid =
{
.suid = TARRAY2_FIRST(writer->tData->suid),
......@@ -466,11 +478,8 @@ static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) {
},
.minVer = TARRAY2_FIRST(writer->tData->version),
.maxVer = TARRAY2_FIRST(writer->tData->version),
.dp[0] =
{
.offset = writer->file->size,
.size = 0,
},
.numRec = TOMB_BLOCK_SIZE(writer->tData),
.cmprAlg = writer->config->cmprAlg,
}};
for (int32_t i = 1; i < TOMB_BLOCK_SIZE(writer->tData); i++) {
......@@ -481,7 +490,7 @@ static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) {
for (int32_t i = 0; i < ARRAY_SIZE(writer->tData->dataArr); i++) {
int32_t size;
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->tData->dataArr[i]),
TARRAY2_DATA_LEN(&writer->tData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP,
TARRAY2_DATA_LEN(&writer->tData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg,
&writer->config->bufArr[0], 0, &size, &writer->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -509,10 +518,9 @@ static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) {
int32_t code = 0;
int32_t lino;
writer->footer->sttBlkPtr->offset = writer->file->size;
writer->footer->sttBlkPtr->size = TARRAY2_DATA_LEN(writer->sttBlkArray);
if (writer->footer->sttBlkPtr->size) {
writer->footer->sttBlkPtr->offset = writer->file->size;
code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->sttBlkArray),
writer->footer->sttBlkPtr->size);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -530,10 +538,9 @@ static int32_t tsdbSttFileDoWriteStatisBlk(SSttFileWriter *writer) {
int32_t code = 0;
int32_t lino;
writer->footer->statisBlkPtr->offset = writer->file->size;
writer->footer->statisBlkPtr->size = TARRAY2_DATA_LEN(writer->statisBlkArray);
if (writer->footer->statisBlkPtr->size) {
writer->footer->statisBlkPtr->offset = writer->file->size;
code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->statisBlkArray),
writer->footer->statisBlkPtr->size);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -551,10 +558,9 @@ static int32_t tsdbSttFileDoWriteTombBlk(SSttFileWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
writer->footer->tombBlkPtr->offset = writer->file->size;
writer->footer->tombBlkPtr->size = TARRAY2_DATA_LEN(writer->tombBlkArray);
if (writer->footer->tombBlkPtr->size) {
writer->footer->tombBlkPtr->offset = writer->file->size;
code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)TARRAY2_DATA(writer->tombBlkArray),
writer->footer->tombBlkPtr->size);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -591,7 +597,7 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
int32_t flag;
char fname[TSDB_FILENAME_LEN];
if (writer->file->size) {
if (writer->file->size > 0) {
flag = TD_FILE_READ | TD_FILE_WRITE;
} else {
flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC;
......@@ -601,7 +607,7 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
code = tsdbOpenFile(fname, writer->config->szPage, flag, &writer->fd);
TSDB_CHECK_CODE(code, lino, _exit);
if (!writer->file->size) {
if (writer->file->size == 0) {
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
code = tsdbWriteFile(writer->fd, 0, hdr, sizeof(hdr));
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -618,9 +624,9 @@ _exit:
}
static void tsdbSttFWriterDoClose(SSttFileWriter *writer) {
ASSERT(!writer->fd);
ASSERT(writer->fd == NULL);
for (int32_t i = 0; i < ARRAY_SIZE(writer->sizeArr); ++i) {
for (int32_t i = 0; i < ARRAY_SIZE(writer->bufArr); ++i) {
tFree(writer->bufArr[i]);
}
tDestroyTSchema(writer->skmRow->pTSchema);
......@@ -672,12 +678,21 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *o
tsdbCloseFile(&writer->fd);
ASSERT(writer->config->file.size < writer->file->size);
STFileOp op = {
.optype = writer->config->file.size ? TSDB_FOP_MODIFY : TSDB_FOP_CREATE,
.fid = writer->config->file.fid,
.of = writer->config->file,
.nf = writer->file[0],
};
STFileOp op;
if (writer->config->file.size == 0) {
op = (STFileOp){
.optype = TSDB_FOP_CREATE,
.fid = writer->config->file.fid,
.nf = writer->file[0],
};
} else {
op = (STFileOp){
.optype = TSDB_FOP_MODIFY,
.fid = writer->config->file.fid,
.of = writer->config->file,
.nf = writer->file[0],
};
}
code = TARRAY2_APPEND(opArray, op);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -702,7 +717,6 @@ static int32_t tsdbSttFWriterCloseAbort(SSttFileWriter *writer) {
tsdbCloseFile(&writer->fd);
taosRemoveFile(fname);
}
return 0;
}
......@@ -770,7 +784,7 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) {
TSDB_CHECK_CODE(code, lino, _exit);
}
STbStatisRecord record[1] = {{
STbStatisRecord record = {
.suid = row->suid,
.uid = row->uid,
.firstKey = key->ts,
......@@ -778,18 +792,17 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) {
.minVer = key->version,
.maxVer = key->version,
.count = 1,
}};
code = tStatisBlockPut(writer->sData, record);
};
code = tStatisBlockPut(writer->sData, &record);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
ASSERT(key->ts >= TARRAY2_LAST(writer->sData->lastKey));
TARRAY2_LAST(writer->sData->minVer) = TMIN(TARRAY2_LAST(writer->sData->minVer), key->version);
TARRAY2_LAST(writer->sData->maxVer) = TMAX(TARRAY2_LAST(writer->sData->maxVer), key->version);
if (key->ts > TARRAY2_LAST(writer->sData->lastKey)) {
TARRAY2_LAST(writer->sData->count)++;
TARRAY2_LAST(writer->sData->lastKey) = key->ts;
} else if (key->ts == TARRAY2_LAST(writer->sData->lastKey)) {
} else {
ASSERTS(0, "timestamp should be in ascending order");
}
}
......@@ -830,6 +843,7 @@ int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *bdata) {
int32_t code = 0;
int32_t lino = 0;
// TODO: optimize here
SRowInfo row[1];
row->suid = bdata->suid;
for (int32_t i = 0; i < bdata->nRow; i++) {
......@@ -854,17 +868,16 @@ int32_t tsdbSttFileWriteTombRecord(SSttFileWriter *writer, const STombRecord *re
if (!writer->ctx->opened) {
code = tsdbSttFWriterDoOpen(writer);
return code;
}
// end time-series data write
if (writer->bData->nRow > 0) {
code = tsdbSttFileDoWriteTSDataBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
if (writer->bData->nRow > 0) {
code = tsdbSttFileDoWriteTSDataBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (STATIS_BLOCK_SIZE(writer->sData) > 0) {
code = tsdbSttFileDoWriteStatisBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit);
if (STATIS_BLOCK_SIZE(writer->sData) > 0) {
code = tsdbSttFileDoWriteStatisBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
// write SDelRecord
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册