diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h index 33cb8335a9655bacb5399ad0bf99c3d74ab96439..2d0caa5751605cfb7332bcd9ae8c67c6d3b0cd16 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbSttFileRW.h @@ -63,7 +63,7 @@ int32_t tsdbSttFileWriterOpen(const SSttFileWriterConfig *config, SSttFileWriter int32_t tsdbSttFileWriterClose(SSttFileWriter **writer, int8_t abort, TFileOpArray *opArray); int32_t tsdbSttFileWriteTSData(SSttFileWriter *writer, SRowInfo *row); int32_t tsdbSttFileWriteTSDataBlock(SSttFileWriter *writer, SBlockData *pBlockData); -int32_t tsdbSttFileWriteDLData(SSttFileWriter *writer, TABLEID *tbid, SDelData *pDelData); +int32_t tsdbSttFileWriteDelRecord(SSttFileWriter *writer, const SDelRecord *record); struct SSttFileWriterConfig { STsdb *tsdb; diff --git a/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h b/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h index 0998c89117a5592edd7695c391e42ec8d72601bc..35f39339d8e3f54a314556eec5977ee36d583944 100644 --- a/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h +++ b/source/dnode/vnode/src/tsdb/dev/inc/tsdbUtil.h @@ -63,8 +63,6 @@ int32_t tDelBlockInit(SDelBlock *delBlock); int32_t tDelBlockFree(SDelBlock *delBlock); int32_t tDelBlockClear(SDelBlock *delBlock); int32_t tDelBlockPut(SDelBlock *delBlock, const SDelRecord *delRecord); -int32_t tDelBlockEncode(SDelBlock *delBlock, void *buf, int32_t size); -int32_t tDelBlockDecode(const void *buf, SDelBlock *delBlock); // STbStatisBlock ---------- #define STATIS_RECORD_NUM_ELEM 9 diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c index 1807f7fa24cdaa3310b78db7b540f6a2551577ac..be904ee8eaf5c67ead1a25480c7d186df7b27158 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbCommit.c @@ -217,45 +217,47 @@ _exit: static int32_t tsdbCommitDelData(SCommitter2 *committer) { int32_t code = 0; - int32_t lino; - - return 0; + int32_t lino = 0; -#if 0 - ASSERTS(0, "TODO: Not implemented yet"); + SMemTable *mem = committer->tsdb->imem; - int64_t nDel = 0; - SMemTable *pMem = committer->tsdb->imem; + if (mem->nDel == 0) goto _exit; - if (pMem->nDel == 0) { // no del data - goto _exit; - } + SRBTreeIter iter[1] = {tRBTreeIterCreate(committer->tsdb->imem->tbDataTree, 1)}; - for (int32_t iTbData = 0; iTbData < taosArrayGetSize(committer->aTbDataP); iTbData++) { - STbData *pTbData = (STbData *)taosArrayGetP(committer->aTbDataP, iTbData); + for (SRBTreeNode *node = tRBTreeIterNext(iter); node; node = tRBTreeIterNext(iter)) { + STbData *tbData = TCONTAINER_OF(node, STbData, rbtn); + SDelRecord record[1] = {{ + .suid = tbData->suid, + .uid = tbData->uid, + }}; - for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) { - if (pDelData->eKey < committer->ctx->minKey) continue; - if (pDelData->sKey > committer->ctx->maxKey) { - committer->ctx->nextKey = TMIN(committer->ctx->nextKey, pDelData->sKey); + for (SDelData *delData = tbData->pHead; delData; delData = delData->pNext) { + if (delData->eKey < committer->ctx->minKey) continue; + if (delData->sKey > committer->ctx->maxKey) { + committer->ctx->nextKey = TMIN(committer->ctx->nextKey, delData->sKey); continue; } - code = tsdbCommitWriteDelData(committer, pTbData->suid, pTbData->uid, pDelData->version, - pDelData->sKey /* TODO */, pDelData->eKey /* TODO */); + record->version = delData->version; + record->skey = TMAX(delData->sKey, committer->ctx->minKey); + if (delData->eKey > committer->ctx->maxKey) { + committer->ctx->nextKey = TMIN(committer->ctx->nextKey, committer->ctx->maxKey + 1); + record->ekey = committer->ctx->maxKey; + } else { + record->ekey = delData->eKey; + } + + code = tsdbSttFileWriteDelRecord(committer->sttWriter, record); // TODO TSDB_CHECK_CODE(code, lino, _exit); } } _exit: if (code) { - tsdbError("vgId:%d failed at line %d since %s", TD_VID(committer->tsdb->pVnode), lino, tstrerror(code)); - } else { - tsdbDebug("vgId:%d %s done, fid:%d nDel:%" PRId64, TD_VID(committer->tsdb->pVnode), __func__, committer->ctx->fid, - pMem->nDel); + TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); } return code; -#endif } static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) { diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c index 89025f5a0ef24823aa86aa675070f6a80ca4ab27..feb937886a99a4cda30ea9124e217cdc071d99e9 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbSttFileRW.c @@ -440,51 +440,62 @@ _exit: } static int32_t tsdbSttFileDoWriteDelBlock(SSttFileWriter *writer) { - return 0; -#if 0 - if (writer->dData->nRow == 0) return 0; + if (DEL_BLOCK_SIZE(writer->dData) == 0) return 0; int32_t code = 0; - int32_t lino; + int32_t lino = 0; - SDelBlk delBlk[1]; + SDelBlk delBlk[1] = {{ + .numRec = DEL_BLOCK_SIZE(writer->dData), + .minTid = + { + .suid = TARRAY2_FIRST(writer->dData->suid), + .uid = TARRAY2_FIRST(writer->dData->uid), + }, + .maxTid = + { + .suid = TARRAY2_LAST(writer->dData->suid), + .uid = TARRAY2_LAST(writer->dData->uid), + }, + .minVer = TARRAY2_FIRST(writer->dData->version), + .maxVer = TARRAY2_FIRST(writer->dData->version), + .dp[0] = + { + .offset = writer->file->size, + .size = 0, + }, + }}; - delBlk->nRow = writer->sData->nRow; - delBlk->minTid.suid = writer->sData->aData[0][0]; - delBlk->minTid.uid = writer->sData->aData[1][0]; - delBlk->maxTid.suid = writer->sData->aData[0][writer->sData->nRow - 1]; - delBlk->maxTid.uid = writer->sData->aData[1][writer->sData->nRow - 1]; - delBlk->minVer = delBlk->maxVer = delBlk->maxVer = writer->sData->aData[2][0]; - for (int32_t iRow = 1; iRow < writer->sData->nRow; iRow++) { - if (delBlk->minVer > writer->sData->aData[2][iRow]) delBlk->minVer = writer->sData->aData[2][iRow]; - if (delBlk->maxVer < writer->sData->aData[2][iRow]) delBlk->maxVer = writer->sData->aData[2][iRow]; + for (int32_t i = 1; i < DEL_BLOCK_SIZE(writer->dData); i++) { + delBlk->minVer = TMIN(delBlk->minVer, TARRAY2_GET(writer->dData->version, i)); + delBlk->maxVer = TMAX(delBlk->maxVer, TARRAY2_GET(writer->dData->version, i)); } - delBlk->dp.offset = writer->file->size; - delBlk->dp.size = 0; // TODO + for (int32_t i = 0; i < ARRAY_SIZE(writer->dData->dataArr); i++) { + int32_t size; + code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&writer->dData->dataArr[i]), + TARRAY2_DATA_LEN(&writer->dData->dataArr[i]), TSDB_DATA_TYPE_BIGINT, writer->config->cmprAlg, + &writer->config->aBuf[0], 0, &size, &writer->config->aBuf[1]); + TSDB_CHECK_CODE(code, lino, _exit); - int64_t tsize = sizeof(int64_t) * writer->dData->nRow; - for (int32_t i = 0; i < ARRAY_SIZE(writer->dData->aData); i++) { - code = tsdbWriteFile(writer->fd, writer->file->size, (const uint8_t *)writer->dData->aData[i], tsize); + code = tsdbWriteFile(writer->fd, writer->file->size, writer->config->aBuf[0], size); TSDB_CHECK_CODE(code, lino, _exit); - delBlk->dp.size += tsize; - writer->file->size += tsize; + delBlk->size[i] = size; + delBlk->dp[0].size += size; + writer->file->size += size; } - tDelBlockDestroy(writer->dData); code = TARRAY2_APPEND_PTR(writer->delBlkArray, delBlk); TSDB_CHECK_CODE(code, lino, _exit); + tDelBlockClear(writer->dData); + _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, - tstrerror(code)); - } else { - // tsdbTrace(); + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; -#endif } static int32_t tsdbSttFileDoWriteSttBlk(SSttFileWriter *writer) { @@ -531,7 +542,7 @@ _exit: static int32_t tsdbSttFileDoWriteDelBlk(SSttFileWriter *writer) { int32_t code = 0; - int32_t lino; + int32_t lino = 0; writer->footer->delBlkPtr->offset = writer->file->size; writer->footer->delBlkPtr->size = TARRAY2_DATA_LEN(writer->delBlkArray); @@ -545,8 +556,7 @@ static int32_t tsdbSttFileDoWriteDelBlk(SSttFileWriter *writer) { _exit: if (code) { - tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(writer->config->tsdb->pVnode), __func__, lino, - tstrerror(code)); + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } @@ -624,7 +634,6 @@ static int32_t tsdbSttFileDoUpdateHeader(SSttFileWriter *writer) { static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *opArray) { int32_t lino; int32_t code; - int32_t vid = TD_VID(writer->config->tsdb->pVnode); code = tsdbSttFileDoWriteTSDataBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); @@ -668,7 +677,7 @@ static int32_t tsdbSttFWriterCloseCommit(SSttFileWriter *writer, TFileOpArray *o _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } @@ -837,42 +846,39 @@ _exit: return code; } -int32_t tsdbSttFileWriteDLData(SSttFileWriter *writer, TABLEID *tbid, SDelData *pDelData) { - ASSERTS(0, "TODO: Not implemented yet"); - +int32_t tsdbSttFileWriteDelRecord(SSttFileWriter *writer, const SDelRecord *record) { int32_t code; int32_t lino; - int32_t vid = TD_VID(writer->config->tsdb->pVnode); if (!writer->ctx->opened) { code = tsdbSttFWriterDoOpen(writer); return code; } - code = tsdbSttFileDoWriteTSDataBlock(writer); - TSDB_CHECK_CODE(code, lino, _exit); + // end time-series data write + if (writer->bData->nRow > 0) { + code = tsdbSttFileDoWriteTSDataBlock(writer); + TSDB_CHECK_CODE(code, lino, _exit); + } - code = tsdbSttFileDoWriteStatisBlock(writer); - TSDB_CHECK_CODE(code, lino, _exit); + if (STATIS_BLOCK_SIZE(writer->sData) > 0) { + code = tsdbSttFileDoWriteStatisBlock(writer); + TSDB_CHECK_CODE(code, lino, _exit); + } -#if 0 - writer->dData[0].aData[0][writer->dData[0].nRow] = tbid->suid; // suid - writer->dData[0].aData[1][writer->dData[0].nRow] = tbid->uid; // uid - writer->dData[0].aData[2][writer->dData[0].nRow] = pDelData->version; // version - writer->dData[0].aData[3][writer->dData[0].nRow] = pDelData->sKey; // skey - writer->dData[0].aData[4][writer->dData[0].nRow] = pDelData->eKey; // ekey - writer->dData[0].nRow++; + // write SDelRecord + code = tDelBlockPut(writer->dData, record); + TSDB_CHECK_CODE(code, lino, _exit); - if (writer->dData[0].nRow >= writer->config->maxRow) { - return tsdbSttFileDoWriteDelBlock(writer); - } else { - return 0; + // write SDelBlock if need + if (DEL_BLOCK_SIZE(writer->dData) >= writer->config->maxRow) { + code = tsdbSttFileDoWriteDelBlock(writer); + TSDB_CHECK_CODE(code, lino, _exit); } -#endif _exit: if (code) { - TSDB_ERROR_LOG(vid, lino, code); + TSDB_ERROR_LOG(TD_VID(writer->config->tsdb->pVnode), lino, code); } return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c b/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c index 5e318771c8bd315ffbb5460f4c557f197f7b9337..5d1ebb2429770d11774f05db2f987e3a3dcc1372 100644 --- a/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/dev/tsdbUtil.c @@ -45,16 +45,6 @@ int32_t tDelBlockPut(SDelBlock *delBlock, const SDelRecord *delRecord) { return 0; } -int32_t tDelBlockEncode(SDelBlock *delBlock, void *buf, int32_t size) { - // TODO - return 0; -} - -int32_t tDelBlockDecode(const void *buf, SDelBlock *delBlock) { - // TODO - return 0; -} - // STbStatisBlock ---------- int32_t tStatisBlockInit(STbStatisBlock *statisBlock) { for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->dataArr); ++i) {