diff --git a/include/util/trbtree.h b/include/util/trbtree.h index 7c93c8dd8d65f2c4947019f370c0933e6ad81ee0..e2264194401bee65a22f0301f0339cb76a8d7356 100644 --- a/include/util/trbtree.h +++ b/include/util/trbtree.h @@ -33,9 +33,12 @@ typedef int32_t (*tRBTreeCmprFn)(const SRBTreeNode *, const SRBTreeNode *); #define tRBTreeMax(T) ((T)->max == ((T)->NIL) ? NULL : (T)->max) void tRBTreeCreate(SRBTree *pTree, tRBTreeCmprFn cmprFn); +void tRBTreeClear(SRBTree *pTree); SRBTreeNode *tRBTreePut(SRBTree *pTree, SRBTreeNode *z); void tRBTreeDrop(SRBTree *pTree, SRBTreeNode *z); SRBTreeNode *tRBTreeDropByKey(SRBTree *pTree, void *pKey); +SRBTreeNode *tRBTreeDropMin(SRBTree *pTree); +SRBTreeNode *tRBTreeDropMax(SRBTree *pTree); SRBTreeNode *tRBTreeGet(SRBTree *pTree, const SRBTreeNode *pKeyNode); // SRBTreeIter ============================================= diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index d387d2dfbe02f53c90851b99ce2b35f51131b4fb..e8b8332aac3805f2fc176ee06bcfa5dc172a00ee 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -320,6 +320,7 @@ int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, const SDiskData **ppDiskData, #define TSDB_TOMB_FILE_DATA_ITER 3 #define TSDB_FILTER_FLAG_BY_VERSION 0x1 +#define TSDB_FILTER_FLAG_BY_TABLEID 0x2 #define TSDB_RBTN_TO_DATA_ITER(pNode) ((STsdbDataIter2 *)(((char *)pNode) - offsetof(STsdbDataIter2, rbtn))) /* open */ @@ -906,6 +907,7 @@ struct STsdbFilterInfo { int32_t flag; int64_t sver; int64_t ever; + TABLEID tbid; }; #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 848d40060f944ef669fd5bc47040b3f14af179b9..75023f3ef219a7723f486ea8bb9a4d20e14f4e36 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -310,43 +310,67 @@ _exit: return code; } +static bool tsdbCompactTableIsDropped(STsdbCompactor *pCompactor) { + SMetaInfo info; + + if (pCompactor->pIter->rowInfo.uid == pCompactor->tbid.uid) return false; + if (metaGetInfo(pCompactor->pTsdb->pVnode->pMeta, pCompactor->tbid.uid, &info, NULL)) { + return true; + } + return false; +} static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor, SRowInfo **ppRowInfo) { int32_t code = 0; int32_t lino = 0; - if (pCompactor->pIter) { - code = tsdbDataIterNext2(pCompactor->pIter, NULL /* TODO */); - TSDB_CHECK_CODE(code, lino, _exit); + for (;;) { + if (pCompactor->pIter) { + code = tsdbDataIterNext2(pCompactor->pIter, NULL); + TSDB_CHECK_CODE(code, lino, _exit); - if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) { - pCompactor->pIter = NULL; - } else { - SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rbt); - if (pNode) { - int32_t c = tsdbDataIterCmprFn(&pCompactor->pIter->rbtn, pNode); - if (c > 0) { - tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn); - pCompactor->pIter = NULL; - } else if (c == 0) { - ASSERT(0); + if (pCompactor->pIter->rowInfo.suid == 0 && pCompactor->pIter->rowInfo.uid == 0) { + pCompactor->pIter = NULL; + } else { + SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rbt); + if (pNode) { + int32_t c = tsdbDataIterCmprFn(&pCompactor->pIter->rbtn, pNode); + if (c > 0) { + tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn); + pCompactor->pIter = NULL; + } else if (c == 0) { + ASSERT(0); + } } } } - } - if (pCompactor->pIter == NULL) { - SRBTreeNode *pNode = tRBTreeMin(&pCompactor->rbt); - if (pNode) { - tRBTreeDrop(&pCompactor->rbt, pNode); - pCompactor->pIter = TSDB_RBTN_TO_DATA_ITER(pNode); + if (pCompactor->pIter == NULL) { + SRBTreeNode *pNode = tRBTreeDropMin(&pCompactor->rbt); + if (pNode) { + pCompactor->pIter = TSDB_RBTN_TO_DATA_ITER(pNode); + } } - } - if (ppRowInfo) { if (pCompactor->pIter) { - *ppRowInfo = &pCompactor->pIter->rowInfo; + if (tsdbCompactTableIsDropped(pCompactor)) { + TABLEID tbid = {.suid = pCompactor->pIter->rowInfo.suid, .uid = pCompactor->pIter->rowInfo.uid}; + tRBTreeClear(&pCompactor->rbt); + for (pCompactor->pIter = pCompactor->iterList; pCompactor->pIter; pCompactor->pIter = pCompactor->pIter->next) { + code = tsdbDataIterNext2(pCompactor->pIter, + &(STsdbFilterInfo){.flag = TSDB_FILTER_FLAG_BY_TABLEID, .tbid = tbid}); + TSDB_CHECK_CODE(code, lino, _exit); + + if (pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid) { + tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn); + } + } + } else { + *ppRowInfo = &pCompactor->pIter->rowInfo; + break; + } } else { *ppRowInfo = NULL; + break; } } @@ -497,7 +521,7 @@ static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor, SDFileSet *pSet) { code = tsdbCompactFileSetStart(pCompactor, pSet); TSDB_CHECK_CODE(code, lino, _exit); - // do compact + // do compact, end with a NULL row SRowInfo *pRowInfo; do { code = tsdbCompactNextRow(pCompactor, &pRowInfo); diff --git a/source/dnode/vnode/src/tsdb/tsdbDataIter.c b/source/dnode/vnode/src/tsdb/tsdbDataIter.c index 491efcd4ed3c299b54e1943bde1fdfcfc8670326..2f49781276082dad126a1d2a1c6a396b7fd9dafe 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataIter.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataIter.c @@ -202,6 +202,13 @@ static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* for (;;) { while (pIter->dIter.iRow < pIter->dIter.bData.nRow) { if (pFilterInfo) { + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_TABLEID) { + if (pFilterInfo->tbid.uid == pIter->dIter.bData.uid) { + pIter->dIter.iRow = pIter->dIter.bData.nRow; + continue; + } + } + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { if (pIter->dIter.bData.aVersion[pIter->dIter.iRow] < pFilterInfo->sver || pIter->dIter.bData.aVersion[pIter->dIter.iRow] > pFilterInfo->ever) { @@ -211,8 +218,8 @@ static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* } } - pIter->rowInfo.suid = pIter->dIter.bData.suid; - pIter->rowInfo.uid = pIter->dIter.bData.uid; + ASSERT(pIter->rowInfo.suid == pIter->dIter.bData.suid); + ASSERT(pIter->rowInfo.uid = pIter->dIter.bData.uid); pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->dIter.bData, pIter->dIter.iRow); pIter->dIter.iRow++; goto _exit; @@ -225,6 +232,13 @@ static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* // filter if (pFilterInfo) { + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_TABLEID) { + if (tTABLEIDCmprFn(&pFilterInfo->tbid, &pIter->rowInfo) == 0) { + pIter->dIter.iDataBlk = pIter->dIter.mDataBlk.nItem; + continue; + } + } + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { if (pFilterInfo->sver > dataBlk.maxVer || pFilterInfo->ever < dataBlk.minVer) { pIter->dIter.iDataBlk++; @@ -248,9 +262,22 @@ static int32_t tsdbDataFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* if (pIter->dIter.iBlockIdx < taosArrayGetSize(pIter->dIter.aBlockIdx)) { SBlockIdx* pBlockIdx = taosArrayGet(pIter->dIter.aBlockIdx, pIter->dIter.iBlockIdx); + if (pFilterInfo && (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_TABLEID)) { + int32_t c = tTABLEIDCmprFn(pBlockIdx, &pFilterInfo->tbid); + if (c == 0) { + pIter->dIter.iBlockIdx++; + continue; + } else if (c < 0) { + ASSERT(0); + } + } + code = tsdbReadDataBlk(pIter->dIter.pReader, pBlockIdx, &pIter->dIter.mDataBlk); TSDB_CHECK_CODE(code, lino, _exit); + pIter->rowInfo.suid = pBlockIdx->suid; + pIter->rowInfo.uid = pBlockIdx->uid; + pIter->dIter.iBlockIdx++; pIter->dIter.iDataBlk = 0; @@ -277,6 +304,14 @@ static int32_t tsdbSttFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* p for (;;) { while (pIter->sIter.iRow < pIter->sIter.bData.nRow) { if (pFilterInfo) { + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_TABLEID) { + int64_t uid = pIter->sIter.bData.uid ? pIter->sIter.bData.uid : pIter->sIter.bData.aUid[pIter->sIter.iRow]; + if (pFilterInfo->tbid.uid == uid) { + pIter->sIter.iRow++; + continue; + } + } + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { if (pFilterInfo->sver > pIter->sIter.bData.aVersion[pIter->sIter.iRow] || pFilterInfo->ever < pIter->sIter.bData.aVersion[pIter->sIter.iRow]) { @@ -298,6 +333,14 @@ static int32_t tsdbSttFileDataIterNext(STsdbDataIter2* pIter, STsdbFilterInfo* p SSttBlk* pSttBlk = taosArrayGet(pIter->sIter.aSttBlk, pIter->sIter.iSttBlk); if (pFilterInfo) { + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_TABLEID) { + if (pSttBlk->suid == pFilterInfo->tbid.suid && pSttBlk->minUid == pFilterInfo->tbid.uid && + pSttBlk->maxUid == pFilterInfo->tbid.uid) { + pIter->sIter.iSttBlk++; + continue; + } + } + if (pFilterInfo->flag & TSDB_FILTER_FLAG_BY_VERSION) { if (pFilterInfo->sver > pSttBlk->maxVer || pFilterInfo->ever < pSttBlk->minVer) { pIter->sIter.iSttBlk++; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index aaa707dbb4348252ef92b2da5c78d53ccf850eb2..eb32ef489c6e3854871d1171ad4373343f0ab070 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1250,10 +1250,7 @@ _exit: int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid) { if (pBlockData->nRow == 0) { return 1; - } - - int64_t luid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1]; - if (luid == uid && pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) { + } else if (pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) { return pBlockData->nRow; } else { return pBlockData->nRow + 1; @@ -1261,31 +1258,7 @@ int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t ui } int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) { - int32_t code = 0; - - ASSERT(pBlockData->suid || pBlockData->uid); - - if (pBlockData->nRow == 0) { - pBlockData->uid = uid; - return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid); - } - - if (pBlockData->uid && pBlockData->uid != uid) { - ASSERT(pBlockData->suid); - - code = tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1)); - if (code) return code; - - for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { - pBlockData->aUid[iRow] = pBlockData->uid; - } - - pBlockData->uid = 0; - } - - // decide append/update row - int64_t luid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1]; - if (luid == uid && pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) { + if (pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) { return tBlockDataUpdateRow(pBlockData, pRow, pTSchema); } else { return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid); diff --git a/source/util/src/trbtree.c b/source/util/src/trbtree.c index ffae5441aa088f2cc71710b301d5243cb3975235..e7386d5912dd83c5a76af3c902bcd910b5ffef87 100644 --- a/source/util/src/trbtree.c +++ b/source/util/src/trbtree.c @@ -147,6 +147,10 @@ static SRBTreeNode *tRBTreePredecessor(SRBTree *pTree, SRBTreeNode *pNode) { void tRBTreeCreate(SRBTree *pTree, tRBTreeCmprFn cmprFn) { pTree->cmprFn = cmprFn; + tRBTreeClear(pTree); +} + +void tRBTreeClear(SRBTree *pTree) { pTree->n = 0; pTree->NIL = &pTree->NILNODE; pTree->NIL->color = BLACK; @@ -423,6 +427,22 @@ SRBTreeNode *tRBTreeDropByKey(SRBTree *pTree, void *pKey) { return pNode; } +SRBTreeNode *tRBTreeDropMin(SRBTree *pTree) { + SRBTreeNode *pNode = tRBTreeMin(pTree); + if (pNode) { + tRBTreeDrop(pTree, pNode); + } + return pNode; +} + +SRBTreeNode *tRBTreeDropMax(SRBTree *pTree) { + SRBTreeNode *pNode = tRBTreeMax(pTree); + if (pNode) { + tRBTreeDrop(pTree, pNode); + } + return pNode; +} + SRBTreeNode *tRBTreeGet(SRBTree *pTree, const SRBTreeNode *pKeyNode) { SRBTreeNode *pNode = pTree->root;