diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 4c8208faa7cd819d581eee556b6bb5cf3eba0b7d..c07bf2d6c841d9b94947bafc5d7e854c4cb49b79 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -172,6 +172,7 @@ int32_t tGetDelData(uint8_t *p, void *ph); void tMapDataReset(SMapData *pMapData); void tMapDataClear(SMapData *pMapData); int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)); +int32_t tMapDataCopy(SMapData *pFrom, SMapData *pTo); void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)); int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *), int32_t (*tItemCmprFn)(const void *, const void *), void *pItem); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 8542eca8c9ef62d3303a18956d9ca629268dd388..08688b442b052b05f17a3f1f6680b600bf868a76 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -591,7 +591,7 @@ _err: return code; } -static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) { +static int32_t tsdbMergeCommitDataBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) { int32_t code = 0; STbData *pTbData = pIter->pTbData; SBlockData *pBlockDataR = &pCommitter->dReader.bData; @@ -941,25 +941,9 @@ _err: return code; } -static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { +static int32_t tsdbMergeCommitData(SCommitter *pCommitter, STbDataIter *pIter) { int32_t code = 0; - ASSERT(pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) >= 0); - ASSERT(pCommitter->dReader.pRowInfo == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRowInfo, pTbData) >= 0); - - // merge commit table data - STbDataIter iter = {0}; - STbDataIter *pIter = &iter; - TSDBROW *pRow; - - tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, pIter); - pRow = tsdbTbDataIterGet(pIter); - if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { - pRow = NULL; - } - - if (pRow == NULL) goto _exit; - int32_t iBlock = 0; SBlock block; SBlock *pBlock = █ @@ -1010,7 +994,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock); if (code) goto _err; } else { - code = tsdbMergeCommitData(pCommitter, pIter, pBlock); + code = tsdbMergeCommitDataBlock(pCommitter, pIter, pBlock); if (code) goto _err; } @@ -1041,11 +1025,46 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { } } - // .data append and .last merge - code = tsdbMergeCommitLast(pCommitter, pIter); +_exit: + return code; + +_err: + return code; +} + +static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { + int32_t code = 0; + + ASSERT(pCommitter->dReader.pBlockIdx == NULL || tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) >= 0); + + // merge commit table data + STbDataIter iter = {0}; + TSDBROW *pRow; + + tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = VERSION_MIN}, 0, &iter); + pRow = tsdbTbDataIterGet(&iter); + if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) { + pRow = NULL; + } + + if (pRow == NULL) { + if (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, pTbData) == 0) { + code = tMapDataCopy(&pCommitter->dReader.mBlock, &pCommitter->dWriter.mBlock); + if (code) goto _err; + } + + goto _exit; + } + + // commit data + code = tsdbMergeCommitData(pCommitter, &iter); if (code) goto _err; - // end + // commit last + code = tsdbMergeCommitLast(pCommitter, &iter); + if (code) goto _err; + +_exit: if (pCommitter->dWriter.mBlock.nItem > 0) { SBlockIdx blockIdx = {.suid = pTbData->suid, .uid = pTbData->uid}; code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx); @@ -1056,9 +1075,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) { goto _err; } } - -_exit: - pRow = tsdbTbDataIterGet(pIter); + pRow = tsdbTbDataIterGet(&iter); if (pRow) { pCommitter->nextKey = TMIN(pCommitter->nextKey, TSDBROW_TS(pRow)); } @@ -1126,71 +1143,6 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { if (code) goto _err; } - // .last - while (true) { - if (pCommitter->dReader.pRowInfo == NULL || tTABLEIDCmprFn(pCommitter->dReader.pRowInfo, &toTable) >= 0) break; - - SBlockData *pBlockDataR = &pCommitter->dReader.bDatal; - SBlockData *pBlockDataW = &pCommitter->dWriter.bDatal; - tb_uid_t suid = pCommitter->dReader.pRowInfo->suid; - tb_uid_t uid = pCommitter->dReader.pRowInfo->uid; - - ASSERT((pBlockDataR->suid && !pBlockDataR->uid) || (!pBlockDataR->suid && pBlockDataR->uid)); - ASSERT(pBlockDataR->nRow > 0); - - // commit and reset block data schema if need - if (pBlockDataW->suid || pBlockDataW->uid) { - if (pBlockDataW->suid != suid || pBlockDataW->suid == 0) { - if (pBlockDataW->nRow > 0) { - code = tsdbCommitLastBlock(pCommitter); - if (code) goto _err; - } - tBlockDataReset(pBlockDataW); - } - } - - // set block data schema if need - if (pBlockDataW->suid == 0 && pBlockDataW->uid == 0) { - code = tsdbCommitterUpdateTableSchema(pCommitter, suid, uid); - if (code) goto _err; - - code = tBlockDataInit(pBlockDataW, suid, suid ? 0 : uid, pCommitter->skmTable.pTSchema); - if (code) goto _err; - } - - // check if it can make sure that one table data in one block - int32_t nRow = 0; - if (pBlockDataR->suid) { - int32_t iRow = pCommitter->dReader.iRow; - while ((iRow < pBlockDataR->nRow) && (pBlockDataR->aUid[iRow] == uid)) { - nRow++; - iRow++; - } - } else { - ASSERT(pCommitter->dReader.iRow == 0); - nRow = pBlockDataR->nRow; - } - - ASSERT(nRow > 0 && nRow < pCommitter->minRow); - - if (pBlockDataW->nRow + nRow > pCommitter->maxRow) { - ASSERT(pBlockDataW->nRow > 0); - - code = tsdbCommitLastBlock(pCommitter); - if (code) goto _err; - } - - while (nRow > 0) { - code = tBlockDataAppendRow(pBlockDataW, &pCommitter->dReader.pRowInfo->row, NULL, uid); - if (code) goto _err; - - code = tsdbCommitterNextLastRow(pCommitter); - if (code) goto _err; - - nRow--; - } - } - return code; _err: diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 6db9d5e6f40c5d35e52d90dd86b28f4cb7a94676..49778542e39b80eec770989e8a7cdb28cd0d1899 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -51,6 +51,22 @@ _exit: return code; } +int32_t tMapDataCopy(SMapData *pFrom, SMapData *pTo) { + int32_t code = 0; + + pTo->nItem = pFrom->nItem; + pTo->nData = pFrom->nData; + code = tRealloc((uint8_t **)&pTo->aOffset, sizeof(int32_t) * pFrom->nItem); + if (code) goto _exit; + code = tRealloc(&pTo->pData, pFrom->nData); + if (code) goto _exit; + memcpy(pTo->aOffset, pFrom->aOffset, sizeof(int32_t) * pFrom->nItem); + memcpy(pTo->pData, pFrom->pData, pFrom->nData); + +_exit: + return code; +} + int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *), int32_t (*tItemCmprFn)(const void *, const void *), void *pItem) { int32_t code = 0;