From c398f9d2994f9e695ff41f1b3c0e3dee4844ca38 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 8 Feb 2023 16:11:47 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbCompact.c | 74 ++++++++++++++--------- 1 file changed, 45 insertions(+), 29 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 4645a14c65..848d40060f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -145,7 +145,7 @@ static int32_t tsdbCompactWriteTableDataStart(STsdbCompactor *pCompactor, TABLEI } } - // reader and write (TODO) + // writer code = tsdbUpdateTableSchema(pCompactor->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pCompactor->tbSkm); TSDB_CHECK_CODE(code, lino, _exit); @@ -154,13 +154,14 @@ static int32_t tsdbCompactWriteTableDataStart(STsdbCompactor *pCompactor, TABLEI code = tBlockDataInit(&pCompactor->bData, pId, pCompactor->tbSkm.pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); - if (!TABLE_SAME_SCHEMA(pCompactor->tbid.suid, pCompactor->tbid.uid, pId->suid, pId->uid)) { + if (!TABLE_SAME_SCHEMA(pCompactor->sData.suid, pCompactor->sData.uid, pId->suid, pId->uid)) { if (pCompactor->sData.nRow > 0) { code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg); TSDB_CHECK_CODE(code, lino, _exit); } - code = tBlockDataInit(&pCompactor->sData, pId /* TODO */, pCompactor->tbSkm.pTSchema, NULL, 0); + TABLEID tbid = {.suid = pId->suid, .uid = pId->suid ? 0 : pId->uid}; + code = tBlockDataInit(&pCompactor->sData, &tbid, pCompactor->tbSkm.pTSchema, NULL, 0); TSDB_CHECK_CODE(code, lino, _exit); } @@ -191,7 +192,6 @@ static int32_t tsdbCompactWriteTableDataEnd(STsdbCompactor *pCompactor) { TSDB_CHECK_CODE(code, lino, _exit); } } - tBlockDataClear(&pCompactor->bData); } else { code = tsdbWriteDataBlock(pCompactor->pWriter, &pCompactor->bData, &pCompactor->mDataBlk, pCompactor->cmprAlg); @@ -199,7 +199,7 @@ static int32_t tsdbCompactWriteTableDataEnd(STsdbCompactor *pCompactor) { } } - if (pCompactor->mDataBlk.nItem) { + if (pCompactor->mDataBlk.nItem > 0) { SBlockIdx *pBlockIdx = (SBlockIdx *)taosArrayReserve(pCompactor->aBlockIdx, 1); if (pBlockIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -268,21 +268,21 @@ static int32_t tsdbCompactWriteTableData(STsdbCompactor *pCompactor, SRowInfo *p int32_t code = 0; int32_t lino = 0; - SRowInfo rInfo; - if (pRowInfo == NULL) { - rInfo.suid = INT64_MAX; - rInfo.uid = INT64_MAX; - // rInfo.row = TSDBORW_V; - pRowInfo = &rInfo; - } - // start a new table data write if need - if (pRowInfo->uid != pCompactor->tbid.uid) { + if (pRowInfo == NULL || pRowInfo->uid != pCompactor->tbid.uid) { if (pCompactor->tbid.uid) { code = tsdbCompactWriteTableDataEnd(pCompactor); TSDB_CHECK_CODE(code, lino, _exit); } + if (pRowInfo == NULL) { + if (pCompactor->sData.nRow > 0) { + code = tsdbWriteSttBlock(pCompactor->pWriter, &pCompactor->sData, pCompactor->aSttBlk, pCompactor->cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); + } + return code; + } + code = tsdbCompactWriteTableDataStart(pCompactor, (TABLEID *)pRowInfo); TSDB_CHECK_CODE(code, lino, _exit); } @@ -290,14 +290,14 @@ static int32_t tsdbCompactWriteTableData(STsdbCompactor *pCompactor, SRowInfo *p // check if row is deleted if (pCompactor->pDKey && tsdbCompactRowIsDeleted(pCompactor, &pRowInfo->row)) goto _exit; - code = tBlockDataUpsertRow(&pCompactor->bData, &pRowInfo->row, NULL, pRowInfo->uid); - TSDB_CHECK_CODE(code, lino, _exit); - - if (pCompactor->bData.nRow >= pCompactor->maxRows) { + if (tBlockDataTryUpsertRow(&pCompactor->bData, &pRowInfo->row, pRowInfo->uid) > pCompactor->maxRows) { code = tsdbWriteDataBlock(pCompactor->pWriter, &pCompactor->bData, &pCompactor->mDataBlk, pCompactor->cmprAlg); TSDB_CHECK_CODE(code, lino, _exit); } + code = tBlockDataUpsertRow(&pCompactor->bData, &pRowInfo->row, NULL, pRowInfo->uid); + TSDB_CHECK_CODE(code, lino, _exit); + _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, @@ -367,7 +367,6 @@ static int32_t tsdbCompactFileSetStart(STsdbCompactor *pCompactor, SDFileSet *pS /* tombstone */ pCompactor->iDelIdx = 0; - pCompactor->iSkyLine = 0; /* reader */ code = tsdbDataFReaderOpen(&pCompactor->pReader, pCompactor->pTsdb, pSet); @@ -376,9 +375,16 @@ static int32_t tsdbCompactFileSetStart(STsdbCompactor *pCompactor, SDFileSet *pS code = tsdbOpenDataFileDataIter(pCompactor->pReader, &pCompactor->pIter); TSDB_CHECK_CODE(code, lino, _exit); + tRBTreeCreate(&pCompactor->rbt, tsdbDataIterCmprFn); if (pCompactor->pIter) { pCompactor->pIter->next = pCompactor->iterList; pCompactor->iterList = pCompactor->pIter; + + code = tsdbDataIterNext2(pCompactor->pIter, NULL); + TSDB_CHECK_CODE(code, lino, _exit); + + ASSERT(pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid); + tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn); } for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { @@ -388,11 +394,15 @@ static int32_t tsdbCompactFileSetStart(STsdbCompactor *pCompactor, SDFileSet *pS if (pCompactor->pIter) { pCompactor->pIter->next = pCompactor->iterList; pCompactor->iterList = pCompactor->pIter; + + code = tsdbDataIterNext2(pCompactor->pIter, NULL); + TSDB_CHECK_CODE(code, lino, _exit); + + ASSERT(pCompactor->pIter->rowInfo.suid || pCompactor->pIter->rowInfo.uid); + tRBTreePut(&pCompactor->rbt, &pCompactor->pIter->rbtn); } } - pCompactor->pIter = NULL; - tRBTreeCreate(&pCompactor->rbt, tsdbDataIterCmprFn); /* writer */ code = tsdbDataFWriterOpen(&pCompactor->pWriter, pCompactor->pTsdb, @@ -438,7 +448,8 @@ static int32_t tsdbCompactFileSetEnd(STsdbCompactor *pCompactor) { int32_t code = 0; int32_t lino = 0; - /* finish remaining data (TODO) */ + ASSERT(pCompactor->bData.nRow == 0); + ASSERT(pCompactor->sData.nRow == 0); /* update files */ code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk); @@ -488,28 +499,33 @@ static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor, SDFileSet *pSet) { // do compact SRowInfo *pRowInfo; - for (;;) { + do { code = tsdbCompactNextRow(pCompactor, &pRowInfo); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbCompactWriteTableData(pCompactor, pRowInfo); TSDB_CHECK_CODE(code, lino, _exit); - - if (pRowInfo == NULL) break; - } + } while (pRowInfo); // end compact code = tsdbCompactFileSetEnd(pCompactor); TSDB_CHECK_CODE(code, lino, _exit); _exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pCompactor->pTsdb->pVnode), __func__, lino, + tstrerror(code), pCompactor->fid); + if (pCompactor->pWriter) tsdbDataFWriterClose(&pCompactor->pWriter, 0); + while ((pCompactor->pIter = pCompactor->iterList)) { + pCompactor->iterList = pCompactor->pIter->next; + tsdbCloseDataIter2(pCompactor->pIter); + } + if (pCompactor->pReader) tsdbDataFReaderClose(&pCompactor->pReader); + } return code; } static void tsdbEndCompact(STsdbCompactor *pCompactor) { - int32_t code = 0; - int32_t lino = 0; - // writer tBlockDataDestroy(&pCompactor->sData); tBlockDataDestroy(&pCompactor->bData); -- GitLab