diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 3ee4980012fd1158afd04e1237c99324db98274b..91098d2ee3bc2c59b79406b02a1356761ef6463f 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -165,6 +165,7 @@ void tBlockDataDestroy(SBlockData *pBlockData); int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid); void tBlockDataReset(SBlockData *pBlockData); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); +int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema); int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid); int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); void tBlockDataClear(SBlockData *pBlockData); diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h index 6c5c832fe41c8a7225144ebae88836b0bf028c5b..c307ec26295fa7b25535418cbc29c0cc3e20e374 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h @@ -24,8 +24,8 @@ extern "C" { #endif typedef TARRAY2(SSttBlk) TSttBlkArray; +typedef TARRAY2(SStatisBlk) TStatisBlkArray; typedef TARRAY2(SDelBlk) TDelBlkArray; -typedef TARRAY2(STbStatisBlk) TStatisBlkArray; // SSttFileReader ========================================== typedef struct SSttFileReader SSttFileReader; @@ -41,20 +41,20 @@ int32_t tsdbSttFileReaderGetSegReader(SSttFileReader *reader, const TSttSegReade // SSttSegReader int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray); -int32_t tsdbSttFileReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlkArray); int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray); +int32_t tsdbSttFileReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlkArray); int32_t tsdbSttFileReadSttBlock(SSttSegReader *reader, const SSttBlk *sttBlk, SBlockData *bData); int32_t tsdbSttFileReadDelBlock(SSttSegReader *reader, const SDelBlk *delBlk, SDelBlock *dData); -int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const STbStatisBlk *statisBlk, STbStatisBlock *sData); +int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *sData); struct SSttFileReaderConfig { STsdb *tsdb; + int32_t szPage; + STFile file[1]; SSkmInfo *skmTb; SSkmInfo *skmRow; uint8_t **aBuf; - int32_t szPage; - STFile file[1]; }; // SSttFileWriter ========================================== diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h index c591960a8e65d1b0640c3846a3e9e526f7077b64..eaf8234b36978369c47f9b2b6d2a20fd82b62545 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h @@ -37,7 +37,7 @@ typedef union { } SDelRecord; typedef union { - TARRAY2(int64_t) aData[DEL_RECORD_NUM_ELEM]; + TARRAY2(int64_t) dataArr[DEL_RECORD_NUM_ELEM]; struct { TARRAY2(int64_t) suid[1]; TARRAY2(int64_t) uid[1]; @@ -74,9 +74,9 @@ typedef union { int64_t suid; int64_t uid; int64_t firstKey; - int64_t firstVer; + int64_t firstKeyVer; int64_t lastKey; - int64_t lastVer; + int64_t lastKeyVer; int64_t minVer; int64_t maxVer; int64_t count; @@ -84,29 +84,29 @@ typedef union { } STbStatisRecord; typedef union { - TARRAY2(int64_t) aData[STATIS_RECORD_NUM_ELEM]; + TARRAY2(int64_t) dataArr[STATIS_RECORD_NUM_ELEM]; struct { TARRAY2(int64_t) suid[1]; TARRAY2(int64_t) uid[1]; TARRAY2(int64_t) firstKey[1]; - TARRAY2(int64_t) firstVer[1]; + TARRAY2(int64_t) firstKeyVer[1]; TARRAY2(int64_t) lastKey[1]; - TARRAY2(int64_t) lastVer[1]; + TARRAY2(int64_t) lastKeyVer[1]; TARRAY2(int64_t) minVer[1]; TARRAY2(int64_t) maxVer[1]; - TARRAY2(int64_t) aCount[1]; + TARRAY2(int64_t) count[1]; }; } STbStatisBlock; -typedef struct STbStatisBlk { +typedef struct SStatisBlk { int32_t numRec; int32_t size[STATIS_RECORD_NUM_ELEM]; - TABLEID minTid; - TABLEID maxTid; + TABLEID minTbid; + TABLEID maxTbid; int64_t minVer; int64_t maxVer; SFDataPtr dp[1]; -} STbStatisBlk; +} SStatisBlk; #define STATIS_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid) diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c index af4ba579c799705e3aa35a4bbc2025029135d04e..e91bd082617ecfe382a71e1d52c08e18c0152361 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c @@ -37,10 +37,10 @@ struct SSttSegReader { bool sttBlkLoaded; bool delBlkLoaded; bool statisBlkLoaded; - } ctx; + } ctx[1]; TSttBlkArray sttBlkArray[1]; - TDelBlkArray delBlkArray[1]; TStatisBlkArray statisBlkArray[1]; + TDelBlkArray delBlkArray[1]; }; // SSttFileReader @@ -60,7 +60,7 @@ static int32_t tsdbSttSegReaderOpen(SSttFileReader *reader, int64_t offset, SStt _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(vid, lino, code); taosMemoryFree(segReader[0]); segReader[0] = NULL; } @@ -68,19 +68,13 @@ _exit: } static int32_t tsdbSttSegReaderClose(SSttSegReader **reader) { - if (!reader[0]) return 0; - - if (reader[0]->ctx.sttBlkLoaded) { + if (reader[0]) { TARRAY2_FREE(reader[0]->sttBlkArray); - } - if (reader[0]->ctx.delBlkLoaded) { TARRAY2_FREE(reader[0]->delBlkArray); - } - if (reader[0]->ctx.statisBlkLoaded) { TARRAY2_FREE(reader[0]->statisBlkArray); + taosMemoryFree(reader[0]); + reader[0] = NULL; } - taosMemoryFree(reader[0]); - reader[0] = NULL; return 0; } @@ -116,17 +110,19 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(vid, lino, code); tsdbSttFileReaderClose(reader); } return code; } int32_t tsdbSttFileReaderClose(SSttFileReader **reader) { - tsdbCloseFile(&reader[0]->fd); - TARRAY2_CLEAR_FREE(reader[0]->readerArray, tsdbSttSegReaderClose); - taosMemoryFree(reader[0]); - reader[0] = NULL; + if (reader[0]) { + tsdbCloseFile(&reader[0]->fd); + TARRAY2_CLEAR_FREE(reader[0]->readerArray, tsdbSttSegReaderClose); + taosMemoryFree(reader[0]); + reader[0] = NULL; + } return 0; } @@ -137,24 +133,27 @@ int32_t tsdbSttFileReaderGetSegReader(SSttFileReader *reader, const TSttSegReade // SSttFSegReader int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray **statisBlkArray) { - if (!reader->ctx.statisBlkLoaded) { + if (!reader->ctx->statisBlkLoaded) { if (reader->footer->statisBlkPtr->size > 0) { - ASSERT(reader->footer->statisBlkPtr->size % sizeof(STbStatisBlk) == 0); + ASSERT(reader->footer->statisBlkPtr->size % sizeof(SStatisBlk) == 0); - int32_t size = reader->footer->statisBlkPtr->size / sizeof(STbStatisBlk); + int32_t size = reader->footer->statisBlkPtr->size / sizeof(SStatisBlk); void *data = taosMemoryMalloc(reader->footer->statisBlkPtr->size); if (!data) return TSDB_CODE_OUT_OF_MEMORY; int32_t code = tsdbReadFile(reader->reader->fd, reader->footer->statisBlkPtr->offset, data, reader->footer->statisBlkPtr->size); - if (code) return code; + if (code) { + taosMemoryFree(data); + return code; + } TARRAY2_INIT_EX(reader->statisBlkArray, size, size, data); } else { TARRAY2_INIT(reader->statisBlkArray); } - reader->ctx.statisBlkLoaded = true; + reader->ctx->statisBlkLoaded = true; } statisBlkArray[0] = reader->statisBlkArray; @@ -162,7 +161,7 @@ int32_t tsdbSttFileReadStatisBlk(SSttSegReader *reader, const TStatisBlkArray ** } int32_t tsdbSttFileReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlkArray) { - if (!reader->ctx.delBlkLoaded) { + if (!reader->ctx->delBlkLoaded) { if (reader->footer->delBlkPtr->size > 0) { ASSERT(reader->footer->delBlkPtr->size % sizeof(SDelBlk) == 0); @@ -172,14 +171,17 @@ int32_t tsdbSttFileReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlk int32_t code = tsdbReadFile(reader->reader->fd, reader->footer->delBlkPtr->offset, data, reader->footer->delBlkPtr->size); - if (code) return code; + if (code) { + taosMemoryFree(data); + return code; + } TARRAY2_INIT_EX(reader->delBlkArray, size, size, data); } else { TARRAY2_INIT(reader->delBlkArray); } - reader->ctx.delBlkLoaded = true; + reader->ctx->delBlkLoaded = true; } delBlkArray[0] = reader->delBlkArray; @@ -187,7 +189,7 @@ int32_t tsdbSttFileReadDelBlk(SSttSegReader *reader, const TDelBlkArray **delBlk } int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlkArray) { - if (!reader->ctx.sttBlkLoaded) { + if (!reader->ctx->sttBlkLoaded) { if (reader->footer->sttBlkPtr->size > 0) { ASSERT(reader->footer->sttBlkPtr->size % sizeof(SSttBlk) == 0); @@ -197,14 +199,17 @@ int32_t tsdbSttFileReadSttBlk(SSttSegReader *reader, const TSttBlkArray **sttBlk int32_t code = tsdbReadFile(reader->reader->fd, reader->footer->sttBlkPtr->offset, data, reader->footer->sttBlkPtr->size); - if (code) return code; + if (code) { + taosMemoryFree(data); + return code; + } TARRAY2_INIT_EX(reader->sttBlkArray, size, size, data); } else { TARRAY2_INIT(reader->sttBlkArray); } - reader->ctx.sttBlkLoaded = true; + reader->ctx->sttBlkLoaded = true; } sttBlkArray[0] = reader->sttBlkArray; @@ -230,7 +235,7 @@ int32_t tsdbSttFileReadDelBlock(SSttSegReader *reader, const SDelBlk *delBlk, SD if (code) TSDB_CHECK_CODE(code, lino, _exit); int64_t size = 0; - for (int32_t i = 0; i < ARRAY_SIZE(dData->aData); ++i) { + for (int32_t i = 0; i < ARRAY_SIZE(dData->dataArr); ++i) { code = tsdbDecmprData(reader->reader->config->aBuf[0] + size, delBlk->size[i], TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP, NULL, 0, NULL); // TODO TSDB_CHECK_CODE(code, lino, _exit); @@ -246,7 +251,7 @@ _exit: return code; } -int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const STbStatisBlk *statisBlk, STbStatisBlock *sData) { +int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *sData) { int32_t code = 0; int32_t lino = 0; int32_t vid = TD_VID(reader->reader->config->tsdb->pVnode); @@ -259,7 +264,7 @@ int32_t tsdbSttFileReadStatisBlock(SSttSegReader *reader, const STbStatisBlk *st if (code) TSDB_CHECK_CODE(code, lino, _exit); int64_t size = 0; - for (int32_t i = 0; i < ARRAY_SIZE(sData->aData); ++i) { + for (int32_t i = 0; i < ARRAY_SIZE(sData->dataArr); ++i) { code = tsdbDecmprData(reader->reader->config->aBuf[0] + size, statisBlk->size[i], TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP, NULL, 0, NULL); // TODO TSDB_CHECK_CODE(code, lino, _exit); @@ -280,23 +285,24 @@ _exit: struct SSttFileWriter { SSttFileWriterConfig config[1]; struct { - bool opened; + bool opened; + TABLEID tbid[1]; } ctx[1]; // file STFile file[1]; // data TSttBlkArray sttBlkArray[1]; - TDelBlkArray delBlkArray[1]; TStatisBlkArray statisBlkArray[1]; + TDelBlkArray delBlkArray[1]; SSttFooter footer[1]; SBlockData bData[1]; - SDelBlock dData[1]; STbStatisBlock sData[1]; + SDelBlock dData[1]; // helper data SSkmInfo skmTb[1]; SSkmInfo skmRow[1]; - int32_t aBufSize[5]; - uint8_t *aBuf[5]; + int32_t sizeArr[5]; + uint8_t *bufArr[5]; STsdbFD *fd; }; @@ -320,29 +326,29 @@ static int32_t tsdbSttFileDoWriteTSDataBlock(SSttFileWriter *writer) { if (sttBlk->maxVer < writer->bData->aVersion[iRow]) sttBlk->maxVer = writer->bData->aVersion[iRow]; } - code = tCmprBlockData(writer->bData, writer->config->cmprAlg, NULL, NULL, writer->config->aBuf, writer->aBufSize); + code = tCmprBlockData(writer->bData, writer->config->cmprAlg, NULL, NULL, writer->config->aBuf, writer->sizeArr); TSDB_CHECK_CODE(code, lino, _exit); sttBlk->bInfo.offset = writer->file->size; - sttBlk->bInfo.szKey = writer->aBufSize[2] + writer->aBufSize[3]; - sttBlk->bInfo.szBlock = writer->aBufSize[0] + writer->aBufSize[1] + sttBlk->bInfo.szKey; + sttBlk->bInfo.szKey = writer->sizeArr[2] + writer->sizeArr[3]; + sttBlk->bInfo.szBlock = writer->sizeArr[0] + writer->sizeArr[1] + sttBlk->bInfo.szKey; for (int32_t i = 3; i >= 0; i--) { - if (writer->aBufSize[i]) { - code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[i], writer->aBufSize[i]); + if (writer->sizeArr[i]) { + code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[i], writer->sizeArr[i]); TSDB_CHECK_CODE(code, lino, _exit); - writer->file->size += writer->aBufSize[i]; + writer->file->size += writer->sizeArr[i]; } } - tBlockDataClear(writer->bData); code = TARRAY2_APPEND_PTR(writer->sttBlkArray, sttBlk); TSDB_CHECK_CODE(code, lino, _exit); + tBlockDataClear(writer->bData); + _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, - tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } @@ -353,10 +359,18 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { int32_t code = 0; int32_t lino = 0; - STbStatisBlk statisBlk[1] = {{ + SStatisBlk statisBlk[1] = {{ .numRec = STATIS_BLOCK_SIZE(writer->sData), - .minTid = {.suid = TARRAY2_FIRST(writer->sData->suid), .uid = TARRAY2_FIRST(writer->sData->uid)}, - .maxTid = {.suid = TARRAY2_LAST(writer->sData->suid), .uid = TARRAY2_LAST(writer->sData->uid)}, + .minTbid = + { + .suid = TARRAY2_FIRST(writer->sData->suid), + .uid = TARRAY2_FIRST(writer->sData->uid), + }, + .maxTbid = + { + .suid = TARRAY2_LAST(writer->sData->suid), + .uid = TARRAY2_LAST(writer->sData->uid), + }, .minVer = TARRAY2_FIRST(writer->sData->minVer), .maxVer = TARRAY2_FIRST(writer->sData->maxVer), }}; @@ -369,30 +383,35 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { statisBlk->dp->offset = writer->file->size; statisBlk->dp->size = 0; - for (int32_t i = 0; i < ARRAY_SIZE(writer->sData->aData); i++) { + for (int32_t i = 0; i < STATIS_RECORD_NUM_ELEM; i++) { int32_t size; - code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->sData->aData[i]), TARRAY2_DATA_LEN(&writer->sData->aData[i]), - TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP, &writer->config->aBuf[0], 0, &size, - &writer->config->aBuf[1]); + code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->sData->dataArr[i]), + TARRAY2_DATA_LEN(&writer->sData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, TWO_STAGE_COMP, + &writer->config->aBuf[0], 0, &size, &writer->config->aBuf[1]); TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[0], size); + TSDB_CHECK_CODE(code, lino, _exit); + statisBlk->size[i] = size; statisBlk->dp->size += size; + writer->file->size += size; } - tStatisBlockClear(writer->sData); - code = TARRAY2_APPEND_PTR(writer->statisBlkArray, statisBlk); TSDB_CHECK_CODE(code, lino, _exit); + tStatisBlockClear(writer->sData); + _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, - tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } static int32_t tsdbSttFileDoWriteDelBlock(SSttFileWriter *writer) { + return 0; #if 0 if (writer->dData->nRow == 0) return 0; @@ -437,7 +456,6 @@ _exit: } return code; #endif - return 0; } static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) { @@ -456,8 +474,7 @@ static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) { _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, - tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } @@ -478,8 +495,7 @@ static int32_t tsdbSttFileDoWriteStatisBlk(SSttFileWriter *writer) { _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, - tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } @@ -507,8 +523,8 @@ _exit: } static int32_t tsdbSttFileDoWriteFooter(SSttFileWriter *writer) { - int32_t code = - tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)&writer->footer, sizeof(writer->footer)); + writer->footer->prevFooter = writer->config->file.size; + int32_t code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->footer, sizeof(writer->footer)); writer->file->size += sizeof(writer->footer); return code; } @@ -523,7 +539,7 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { writer->file->stt->nseg++; if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb; if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow; - if (!writer->config->aBuf) writer->config->aBuf = writer->aBuf; + if (!writer->config->aBuf) writer->config->aBuf = writer->bufArr; // open file int32_t flag; @@ -541,7 +557,6 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { if (!writer->file->size) { uint8_t hdr[TSDB_FHDR_SIZE] = {0}; - code = tsdbWriteFile(writer->fd, 0, hdr, sizeof(hdr)); TSDB_CHECK_CODE(code, lino, _exit); writer->file->size += sizeof(hdr); @@ -549,26 +564,26 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) { _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(vid, lino, code); } else { writer->ctx->opened = true; } - return 0; + return code; } static void tsdbSttFWriterDoClose(SSttFileWriter *writer) { ASSERT(!writer->fd); - for (int32_t i = 0; i < ARRAY_SIZE(writer->aBufSize); ++i) { - tFree(writer->aBuf[i]); + for (int32_t i = 0; i < ARRAY_SIZE(writer->sizeArr); ++i) { + tFree(writer->bufArr[i]); } tDestroyTSchema(writer->skmRow->pTSchema); tDestroyTSchema(writer->skmTb->pTSchema); tStatisBlockFree(writer->sData); tDelBlockFree(writer->dData); tBlockDataDestroy(writer->bData); - TARRAY2_FREE(writer->statisBlkArray); TARRAY2_FREE(writer->delBlkArray); + TARRAY2_FREE(writer->statisBlkArray); TARRAY2_FREE(writer->sttBlkArray); } @@ -619,7 +634,7 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, STFileOp *op) { _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(vid, lino, code); } return code; } @@ -656,7 +671,7 @@ int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, STFileOp * int32_t vid = TD_VID(writer[0]->config->tsdb->pVnode); if (!writer[0]->ctx->opened) { - op->optype = TSDB_FOP_NONE; + if (op) op->optype = TSDB_FOP_NONE; } else { if (abort) { code = tsdbSttFWriterCloseAbort(writer[0]); @@ -672,7 +687,7 @@ int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, STFileOp * _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); + TSDB_ERROR_LOG(vid, lino, code); } return code; } @@ -680,18 +695,30 @@ _exit: int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) { int32_t code = 0; int32_t lino = 0; + int32_t vid = TD_VID(writer->config->tsdb->pVnode); if (!writer->ctx->opened) { code = tsdbSttFWriterDoOpen(writer); TSDB_CHECK_CODE(code, lino, _exit); } - TSDBROW *pRow = &row->row; - TSDBKEY key = TSDBROW_KEY(pRow); - if (!TABLE_SAME_SCHEMA(writer->bData->suid, writer->bData->uid, row->suid, row->uid)) { + TSDBKEY key[1] = {TSDBROW_KEY(&row->row)}; + if (!TABLE_SAME_SCHEMA(row->suid, row->uid, writer->ctx->tbid->suid, writer->ctx->tbid->uid)) { code = tsdbSttFileDoWriteTSDataBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); + code = tsdbUpdateSkmTb(writer->config->tsdb, (TABLEID *)row, writer->config->skmTb); + TSDB_CHECK_CODE(code, lino, _exit); + + TABLEID id = {.suid = row->suid, .uid = row->suid ? 0 : row->uid}; + code = tBlockDataInit(writer->bData, &id, writer->config->skmTb->pTSchema, NULL, 0); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (writer->ctx->tbid->uid != row->uid) { + writer->ctx->tbid->suid = row->suid; + writer->ctx->tbid->uid = row->uid; + if (STATIS_BLOCK_SIZE(writer->sData) >= writer->config->maxRow) { code = tsdbSttFileDoWriteStatisBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); @@ -700,54 +727,63 @@ int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row) { STbStatisRecord record[1] = {{ .suid = row->suid, .uid = row->uid, - .firstKey = key.ts, - .firstVer = key.version, - .lastKey = key.ts, - .lastVer = key.version, - .minVer = key.version, - .maxVer = key.version, + .firstKey = key->ts, + .firstKeyVer = key->version, + .lastKey = key->ts, + .lastKeyVer = key->version, + .minVer = key->version, + .maxVer = key->version, .count = 1, }}; code = tStatisBlockPut(writer->sData, record); TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbUpdateSkmTb(writer->config->tsdb, (TABLEID *)row, writer->config->skmTb); - TSDB_CHECK_CODE(code, lino, _exit); - - TABLEID id = {.suid = row->suid, .uid = row->suid ? 0 : row->uid}; - code = tBlockDataInit(writer->bData, &id, writer->config->skmTb->pTSchema, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); } if (row->row.type == TSDBROW_ROW_FMT) { - code = tsdbUpdateSkmRow(writer->config->tsdb, (TABLEID *)row, TSDBROW_SVERSION(pRow), writer->config->skmRow); + code = tsdbUpdateSkmRow(writer->config->tsdb, writer->ctx->tbid, // + TSDBROW_SVERSION(&row->row), writer->config->skmRow); TSDB_CHECK_CODE(code, lino, _exit); } - code = tBlockDataAppendRow(writer->bData, pRow, writer->config->skmRow->pTSchema, row->uid); - TSDB_CHECK_CODE(code, lino, _exit); + // row to col conversion + if (key->version <= writer->config->compactVersion) { + if (writer->bData->nRow > 0 // + && (writer->bData->uid // + ? writer->bData->uid + : writer->bData->aUid[writer->bData->nRow - 1]) == row->uid // + && writer->bData->aTSKEY[writer->bData->nRow - 1] == key->ts // + ) { + code = tBlockDataUpdateRow(writer->bData, &row->row, writer->config->skmRow->pTSchema); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tBlockDataAppendRow(writer->bData, &row->row, writer->config->skmRow->pTSchema, row->uid); + TSDB_CHECK_CODE(code, lino, _exit); + } + } else { + if (writer->bData->nRow >= writer->config->maxRow) { + code = tsdbSttFileDoWriteTSDataBlock(writer); + TSDB_CHECK_CODE(code, lino, _exit); + } - if (writer->bData->nRow >= writer->config->maxRow) { - code = tsdbSttFileDoWriteTSDataBlock(writer); + code = tBlockDataAppendRow(writer->bData, &row->row, writer->config->skmRow->pTSchema, row->uid); TSDB_CHECK_CODE(code, lino, _exit); } - TARRAY2_LAST(writer->sData->minVer) = TMIN(TARRAY2_LAST(writer->sData->minVer), key.version); - TARRAY2_LAST(writer->sData->maxVer) = TMAX(TARRAY2_LAST(writer->sData->maxVer), key.version); - if (key.ts > TARRAY2_LAST(writer->sData->lastKey)) { - TARRAY2_LAST(writer->sData->lastKey) = key.ts; - TARRAY2_LAST(writer->sData->lastVer) = key.version; - TARRAY2_LAST(writer->sData->aCount)++; - } else if (key.ts == TARRAY2_LAST(writer->sData->lastKey)) { - TARRAY2_LAST(writer->sData->lastVer) = key.version; + TARRAY2_LAST(writer->sData->minVer) = TMIN(TARRAY2_LAST(writer->sData->minVer), key->version); + TARRAY2_LAST(writer->sData->maxVer) = TMAX(TARRAY2_LAST(writer->sData->maxVer), key->version); + if (key->ts > TARRAY2_LAST(writer->sData->lastKey)) { + TARRAY2_LAST(writer->sData->lastKey) = key->ts; + TARRAY2_LAST(writer->sData->lastKeyVer) = key->version; + TARRAY2_LAST(writer->sData->count)++; + } else if (key->ts == TARRAY2_LAST(writer->sData->lastKey)) { + TARRAY2_LAST(writer->sData->lastKeyVer) = key->version; } else { ASSERTS(0, "timestamp should be in ascending order"); } _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, - tstrerror(code)); + TSDB_ERROR_LOG(vid, lino, code); } return code; } @@ -768,32 +804,47 @@ int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *bdata) { _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, - tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } - return 0; + return code; } int32_t tsdbSttFileWriteDLData(SSttFileWriter *writer, TABLEID *tbid, SDelData *pDelData) { ASSERTS(0, "TODO: Not implemented yet"); int32_t code; + int32_t lino; + int32_t vid = TD_VID(writer->config->tsdb->pVnode); + if (!writer->ctx->opened) { code = tsdbSttFWriterDoOpen(writer); return code; } - // writer->dData[0].aData[0][writer->dData[0].nRow] = tbid->suid; // suid - // writer->dData[0].aData[1][writer->dData[0].nRow] = tbid->uid; // uid - // writer->dData[0].aData[2][writer->dData[0].nRow] = pDelData->version; // version - // writer->dData[0].aData[3][writer->dData[0].nRow] = pDelData->sKey; // skey - // writer->dData[0].aData[4][writer->dData[0].nRow] = pDelData->eKey; // ekey - // writer->dData[0].nRow++; + code = tsdbSttFileDoWriteTSDataBlock(writer); + TSDB_CHECK_CODE(code, lino, _exit); - // if (writer->dData[0].nRow >= writer->config->maxRow) { - // return tsdbSttFileDoWriteDelBlock(writer); - // } else { - // return 0; - // } - return 0; + code = tsdbSttFileDoWriteStatisBlock(writer); + TSDB_CHECK_CODE(code, lino, _exit); + +#if 0 + writer->dData[0].aData[0][writer->dData[0].nRow] = tbid->suid; // suid + writer->dData[0].aData[1][writer->dData[0].nRow] = tbid->uid; // uid + writer->dData[0].aData[2][writer->dData[0].nRow] = pDelData->version; // version + writer->dData[0].aData[3][writer->dData[0].nRow] = pDelData->sKey; // skey + writer->dData[0].aData[4][writer->dData[0].nRow] = pDelData->eKey; // ekey + writer->dData[0].nRow++; + + if (writer->dData[0].nRow >= writer->config->maxRow) { + return tsdbSttFileDoWriteDelBlock(writer); + } else { + return 0; + } +#endif + +_exit: + if (code) { + TSDB_ERROR_LOG(vid, lino, code); + } + return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c b/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c index fbe88564a2f6788c8d6fc5b977961e6dd234dc34..5e318771c8bd315ffbb5460f4c557f197f7b9337 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c @@ -17,29 +17,29 @@ // SDelBlock ---------- int32_t tDelBlockInit(SDelBlock *delBlock) { - for (int32_t i = 0; i < ARRAY_SIZE(delBlock->aData); ++i) { - TARRAY2_INIT(&delBlock->aData[i]); + for (int32_t i = 0; i < ARRAY_SIZE(delBlock->dataArr); ++i) { + TARRAY2_INIT(&delBlock->dataArr[i]); } return 0; } int32_t tDelBlockFree(SDelBlock *delBlock) { - for (int32_t i = 0; i < ARRAY_SIZE(delBlock->aData); ++i) { - TARRAY2_FREE(&delBlock->aData[i]); + for (int32_t i = 0; i < ARRAY_SIZE(delBlock->dataArr); ++i) { + TARRAY2_FREE(&delBlock->dataArr[i]); } return 0; } int32_t tDelBlockClear(SDelBlock *delBlock) { - for (int32_t i = 0; i < ARRAY_SIZE(delBlock->aData); ++i) { - TARRAY2_CLEAR(&delBlock->aData[i], NULL); + for (int32_t i = 0; i < ARRAY_SIZE(delBlock->dataArr); ++i) { + TARRAY2_CLEAR(&delBlock->dataArr[i], NULL); } return 0; } int32_t tDelBlockPut(SDelBlock *delBlock, const SDelRecord *delRecord) { - for (int32_t i = 0; i < ARRAY_SIZE(delBlock->aData); ++i) { - int32_t code = TARRAY2_APPEND(&delBlock->aData[i], delRecord->aData[i]); + for (int32_t i = 0; i < ARRAY_SIZE(delBlock->dataArr); ++i) { + int32_t code = TARRAY2_APPEND(&delBlock->dataArr[i], delRecord->aData[i]); if (code) return code; } return 0; @@ -57,29 +57,29 @@ int32_t tDelBlockDecode(const void *buf, SDelBlock *delBlock) { // STbStatisBlock ---------- int32_t tStatisBlockInit(STbStatisBlock *statisBlock) { - for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->aData); ++i) { - TARRAY2_INIT(&statisBlock->aData[i]); + for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->dataArr); ++i) { + TARRAY2_INIT(&statisBlock->dataArr[i]); } return 0; } int32_t tStatisBlockFree(STbStatisBlock *statisBlock) { - for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->aData); ++i) { - TARRAY2_FREE(&statisBlock->aData[i]); + for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->dataArr); ++i) { + TARRAY2_FREE(&statisBlock->dataArr[i]); } return 0; } int32_t tStatisBlockClear(STbStatisBlock *statisBlock) { - for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->aData); ++i) { - TARRAY2_CLEAR(&statisBlock->aData[i], NULL); + for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->dataArr); ++i) { + TARRAY2_CLEAR(&statisBlock->dataArr[i], NULL); } return 0; } int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *statisRecord) { - for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->aData); ++i) { - int32_t code = TARRAY2_APPEND(&statisBlock->aData[i], statisRecord->aData[i]); + for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->dataArr); ++i) { + int32_t code = TARRAY2_APPEND(&statisBlock->dataArr[i], statisRecord->aData[i]); if (code) return code; } return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index ed30591d3c95d9c34934b67083e353ea1a58b9f7..e526d67f3c7f8875db61bcee3952d087ebefee7f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1237,7 +1237,7 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS _exit: return code; } -static int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { +int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { int32_t code = 0; // version