From bbf92eeba6d5e15a3fb1eb2860f55aa26f991888 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 24 Jun 2022 10:10:41 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 206 ++++++++---------- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 25 +-- source/dnode/vnode/src/tsdb/tsdbUtil.c | 9 +- 3 files changed, 111 insertions(+), 129 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 09ef37ab62..1fba5005a1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -340,88 +340,6 @@ _err: return code; } -#define ROW_END(pRow, maxKey) (((pRow) == NULL) || ((pRow)->pTSRow->ts > (maxKey))) - -// static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, TSDBKEY eKey, -// bool toDataOnly) { -// int32_t code = 0; -// TSDBROW *pRow; -// STSchema *pTSchema = NULL; // TODO -// TSDBKEY key; -// SBlock *pBlock = &pCommitter->nBlock; - -// if (pIter == NULL) goto _exit; - -// tBlockReset(pBlock); -// tBlockDataReset(&pCommitter->nBlockData); -// while (true) { -// pRow = tsdbTbDataIterGet(pIter); - -// if (pRow == NULL || tsdbKeyCmprFn(&(TSDBKEY){.ts = pRow->pTSRow->ts, .version = pRow->version}, &eKey) > 0) { -// if (pCommitter->nBlockData.nRow == 0) { -// break; -// } else { -// goto _write_block_data; -// } -// } - -// // update schema -// if (pTSchema == NULL || pTSchema->version != TSDBROW_SVERSION(pRow)) { -// // TODO -// // pTSchema = NULL; -// } - -// // append row -// code = tBlockDataAppendRow(&pCommitter->nBlockData, pRow, pTSchema); -// if (code) goto _err; - -// // update info -// key = tsdbRowKey(pRow); -// if (tsdbKeyCmprFn(&key, &pBlock->info.maxKey) > 0) pBlock->info.maxKey = key; -// if (tsdbKeyCmprFn(&key, &pBlock->info.minKey) < 0) pBlock->info.minKey = key; -// if (key.version > pBlock->info.maxVersion) pBlock->info.maxVersion = key.version; -// if (key.version < pBlock->info.minVerion) pBlock->info.minVerion = key.version; - -// // iter next -// tsdbTbDataIterNext(pIter); - -// // check write -// if (pCommitter->nBlockData.nRow < pCommitter->maxRow * 4 / 5) { -// continue; -// } - -// _write_block_data: -// if (!toDataOnly && pCommitter->nBlockData.nRow < pCommitter->minKey) { -// pCommitter->nBlock.last = 1; -// } else { -// pCommitter->nBlock.last = 0; -// } - -// code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, pBlock); -// if (code) goto _err; - -// code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock); -// if (code) goto _err; - -// // update info -// if (tsdbKeyCmprFn(&pBlock->info.minKey, &pBlockIdx->info.minKey) < 0) pBlock->info.minKey = -// pBlockIdx->info.minKey; if (tsdbKeyCmprFn(&pBlock->info.maxKey, &pBlockIdx->info.maxKey) < 0) pBlock->info.maxKey -// = pBlockIdx->info.maxKey; if (pBlock->info.minVerion < pBlockIdx->info.minVerion) pBlockIdx->info.minVerion = -// pBlock->info.minVerion; if (pBlock->info.maxVersion < pBlockIdx->info.maxVersion) pBlockIdx->info.maxVersion = -// pBlock->info.maxVersion; - -// tBlockReset(pBlock); -// tBlockDataReset(&pCommitter->nBlockData); -// } - -// _exit: -// return code; - -// _err: -// tsdbError("vgId:%d commit memory data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); -// return code; -// } - // static int32_t tsdbGetOverlapRowNumber(STbDataIter *pIter, SBlock *pBlock) { // int32_t nRow = 0; // TSDBROW *pRow; @@ -755,60 +673,61 @@ static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, STbData *pTbData) { if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) goto _exit; // main loop - SMapData *mBlock = &pCommitter->nBlockMap; SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pTbData->suid, .uid = pTbData->uid}; + SMapData *mBlock = &pCommitter->nBlockMap; SBlock *pBlock = &pCommitter->nBlock; SBlockData *pBlockData = &pCommitter->nBlockData; + TSKEY lastTS; - tMapDataReset(mBlock); tBlockIdxReset(pBlockIdx); + tMapDataReset(mBlock); tBlockReset(pBlock); tBlockDataReset(pBlockData); - while (pRow != NULL && TSDBROW_TS(pRow) <= pCommitter->maxKey) { + lastTS = TSKEY_MIN; + while (1) { + if (pRow == NULL || TSDBROW_TS(pRow) > pCommitter->maxKey) { + if (pBlockData->nRow > 0) { + goto _write_block; + } else { + break; + } + } + + // update schema code = tsdbCommitterUpdateSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow)); if (code) goto _err; + // append code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema); if (code) goto _err; + // update pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow)); pBlock->maxVersion = TMAX(pBlock->maxVersion, TSDBROW_VERSION(pRow)); pBlock->nRow++; + if (TSDBROW_TS(pRow) == lastTS) pBlock->hasDup = 1; + lastTS = TSDBROW_TS(pRow); // next tsdbTbDataIterNext(pIter); pRow = tsdbTbDataIterGet(pIter); - if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { - ASSERT(0); - // // SBlock - // pBlock->last = 0; - // pBlock->cmprAlg = pCommitter->cmprAlg; - // code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock); - // if (code) goto _err; - - // // SBlockIdx - // pBlockIdx->minKey = TMIN(pBlockIdx->minKey, pBlock->minKey.ts); - // pBlockIdx->maxKey = TMAX(pBlockIdx->maxKey, pBlock->maxKey.ts); - // pBlockIdx->minVersion = TMIN(pBlockIdx->minVersion, pBlock->minVersion); - // pBlockIdx->maxVersion = TMAX(pBlockIdx->maxVersion, pBlock->maxVersion); - - // tBlockReset(pBlock); - // tBlockDataReset(pBlockData); - } - } + // check + if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block; + continue; - if (pBlockData->nRow > 0) { - // SBlock + _write_block: row = tBlockDataFirstRow(pBlockData); if (tsdbKeyCmprFn(&pBlock->minKey, &TSDBROW_KEY(&row)) > 0) pBlock->minKey = TSDBROW_KEY(&row); row = tBlockDataLastRow(pBlockData); if (tsdbKeyCmprFn(&pBlock->maxKey, &TSDBROW_KEY(&row)) < 0) pBlock->maxKey = TSDBROW_KEY(&row); - pBlock->last = 1; + pBlock->last = pBlockData->nRow < pCommitter->minRow ? 1 : 0; pBlock->cmprAlg = pCommitter->cmprAlg; code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock); if (code) goto _err; + // Design SMA and write SMA to file + // SBlockIdx code = tMapDataPutItem(mBlock, pBlock, tPutBlock); if (code) goto _err; @@ -816,6 +735,10 @@ static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, STbData *pTbData) { pBlockIdx->maxKey = TMAX(pBlockIdx->maxKey, pBlock->maxKey.ts); pBlockIdx->minVersion = TMIN(pBlockIdx->minVersion, pBlock->minVersion); pBlockIdx->maxVersion = TMAX(pBlockIdx->maxVersion, pBlock->maxVersion); + + tBlockReset(pBlock); + tBlockDataReset(pBlockData); + lastTS = TSKEY_MIN; } // write block @@ -834,6 +757,65 @@ _err: return code; } +static int32_t tsdbCommitDiskData(SCommitter *pCommitter, SBlockIdx *oBlockIdx) { + int32_t code = 0; + SMapData *mBlockO = &pCommitter->oBlockMap; + SMapData *mBlockN = &pCommitter->nBlockMap; + SBlock *pBlockO = &pCommitter->oBlock; + SBlock *pBlockN = &pCommitter->nBlock; + SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = oBlockIdx->suid, + .uid = oBlockIdx->uid, + .maxKey = oBlockIdx->maxKey, + .minKey = oBlockIdx->minKey, + .minVersion = oBlockIdx->minVersion, + .maxVersion = oBlockIdx->maxVersion, + .offset = -1, + .size = -1}; + SBlockData *pBlockDataO = &pCommitter->oBlockData; + + // read + code = tsdbReadBlock(pCommitter->pReader, oBlockIdx, mBlockO, NULL); + if (code) goto _err; + + // loop to add to new + tMapDataReset(mBlockN); + for (int32_t iBlock = 0; iBlock < mBlockO->nItem; iBlock++) { + tMapDataGetItemByIdx(mBlockO, iBlock, pBlockO, tGetBlock); + + if (pBlockO->last) { + ASSERT(iBlock == mBlockO->nItem - 1); + code = tsdbReadBlockData(pCommitter->pReader, oBlockIdx, pBlockO, pBlockDataO, NULL, -1, NULL, NULL); + if (code) goto _err; + + tBlockReset(pBlockN); + pBlockN->last = 1; + pBlockN->cmprAlg = pBlockO->cmprAlg; + code = tsdbWriteBlockData(pCommitter->pWriter, pBlockDataO, NULL, NULL, pBlockIdx, pBlockN); + if (code) goto _err; + + code = tMapDataPutItem(mBlockN, pBlockN, tPutBlock); + if (code) goto _err; + } else { + code = tMapDataPutItem(mBlockN, pBlockO, tPutBlock); + if (code) goto _err; + } + } + + // SBlock + code = tsdbWriteBlock(pCommitter->pWriter, mBlockN, NULL, pBlockIdx); + if (code) goto _err; + + // SBlockIdx + code = tMapDataPutItem(&pCommitter->nBlockIdxMap, pBlockIdx, tPutBlockIdx); + if (code) goto _err; + + return code; + +_err: + tsdbError("vgId:%d tsdb Commit disk data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + return code; +} + static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { int32_t code = 0; int32_t c; @@ -878,8 +860,8 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { } } else if (c < 0) { // commit memory data - // code = tsdbCommitMemoryData(pCommitter, pTbData); - // if (code) goto _err; + code = tsdbCommitMemoryData(pCommitter, pTbData); + if (code) goto _err; iTbData++; if (iTbData < nTbData) { @@ -889,8 +871,8 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { } } else { // commit disk data - // code = tsdbCommitDiskData(pCommitter, pBlockIdx); - // if (code) goto _err; + code = tsdbCommitDiskData(pCommitter, pBlockIdx); + if (code) goto _err; iBlockIdx++; if (iBlockIdx < nBlockIdx) { @@ -904,8 +886,8 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { // disk while (pBlockIdx) { // commit disk data - // code = tsdbCommitDiskData(pCommitter, pBlockIdx); - // if (code) goto _err; + code = tsdbCommitDiskData(pCommitter, pBlockIdx); + if (code) goto _err; iBlockIdx++; if (iBlockIdx < nBlockIdx) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 984f1f11a3..cad2404e63 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -541,13 +541,11 @@ _err: } int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock, uint8_t **ppBuf) { - int32_t code = 0; - int64_t offset = pBlockIdx->offset; - int64_t size = pBlockIdx->size; - int64_t n; - uint32_t delimiter; - int64_t suid; - int64_t uid; + int32_t code = 0; + int64_t offset = pBlockIdx->offset; + int64_t size = pBlockIdx->size; + int64_t n; + SBlockDataHdr hdr; // alloc if (!ppBuf) ppBuf = &mBlock->pBuf; @@ -577,13 +575,12 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl } // decode - n = 0; - n += tGetU32(*ppBuf + n, &delimiter); - ASSERT(delimiter == TSDB_FILE_DLMT); - n += tGetI64(*ppBuf + n, &suid); - ASSERT(suid == pBlockIdx->suid); - n += tGetI64(*ppBuf + n, &uid); - ASSERT(uid == pBlockIdx->uid); + hdr = *(SBlockDataHdr *)(*ppBuf); + ASSERT(hdr.delimiter == TSDB_FILE_DLMT); + ASSERT(hdr.suid == pBlockIdx->suid); + ASSERT(hdr.uid == pBlockIdx->uid); + + n = sizeof(hdr); n += tGetMapData(*ppBuf + n, mBlock); ASSERT(n + sizeof(TSCKSUM) == size); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index d64e417385..73b7864157 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -27,9 +27,12 @@ void tMapDataReset(SMapData *pMapData) { } void tMapDataClear(SMapData *pMapData) { - tsdbFree(pMapData->pOfst); - tsdbFree(pMapData->pData); - tsdbFree(pMapData->pBuf); + if (pMapData->pBuf) { + tsdbFree(pMapData->pBuf); + } else { + tsdbFree(pMapData->pOfst); + tsdbFree(pMapData->pData); + } } int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) { -- GitLab