diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index b1ff4c9d3f3e034e2a56fac8ac2fcd1b54f4c918..aee46d421ebe22892ef2ab12ca6fc3e77e25f33a 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -77,6 +77,9 @@ typedef struct STsdbFSState STsdbFSState; #define VERSION_MIN 0 #define VERSION_MAX INT64_MAX +#define TSDBKEY_MIN ((TSDBKEY){.ts = TSKEY_MIN, .version = VERSION_MIN}) +#define TSDBKEY_MAX ((TSDBKEY){.ts = TSKEY_MAX, .version = VERSION_MAX}) + // tsdbUtil.c ============================================================================================== // TSDBROW #define TSDBROW_SVERSION(ROW) TD_ROW_SVER((ROW)->pTSRow) @@ -110,14 +113,15 @@ int32_t tGetKEYINFO(uint8_t *p, KEYINFO *pKeyInfo); int32_t tPutBlockCol(uint8_t *p, void *ph); int32_t tGetBlockCol(uint8_t *p, void *ph); // SBlock -#define tBlockInit() ((SBlock){.info = tKEYINFOInit()}) +#define tBlockInit() ((SBlock){0}) void tBlockReset(SBlock *pBlock); void tBlockClear(SBlock *pBlock); int32_t tPutBlock(uint8_t *p, void *ph); int32_t tGetBlock(uint8_t *p, void *ph); int32_t tBlockCmprFn(const void *p1, const void *p2); // SBlockIdx -#define tBlockIdxInit(SUID, UID) ((SBlockIdx){.suid = (SUID), .uid = (UID), .info = tKEYINFOInit()}) +// #define tBlockIdxInit(SUID, UID) ((SBlockIdx){.suid = (SUID), .uid = (UID), .info = tKEYINFOInit()}) +void tBlockIdxReset(SBlockIdx *pBlockIdx); int32_t tPutBlockIdx(uint8_t *p, void *ph); int32_t tGetBlockIdx(uint8_t *p, void *ph); // SColdata @@ -328,7 +332,10 @@ struct TSDBROW { struct SBlockIdx { int64_t suid; int64_t uid; - KEYINFO info; + TSKEY minKey; + TSKEY maxKey; + int64_t minVersion; + int64_t maxVersion; int64_t offset; int64_t size; }; @@ -358,7 +365,10 @@ typedef struct { } SSubBlock; struct SBlock { - KEYINFO info; + TSDBKEY minKey; + TSDBKEY maxKey; + int64_t minVersion; + int64_t maxVersion; int32_t nRow; int8_t last; int8_t hasDup; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 6ed12e05f803a2aa90ecda20b7793bd32055829f..c8b198492ab89abdd552687a515e8162a5a6bed0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -143,7 +143,7 @@ _err: static int32_t tsdbCommitTableDel(SCommitter *pCommitter, STbData *pTbData, SDelIdx *pDelIdx) { int32_t code = 0; - SDelData *pDelData; + SDelData *pDelData = &(SDelData){}; tb_uid_t suid; tb_uid_t uid; SDelIdx delIdx; // TODO @@ -338,366 +338,419 @@ _err: #define ROW_END(pRow, maxKey) (((pRow) == NULL) || ((pRow)->pTSRow->ts > (maxKey))) -static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, TSDBKEY eKey, - bool toDataOnly) { - int32_t code = 0; - TSDBROW *pRow; - STSchema *pTSchema = NULL; // TODO - TSDBKEY key; - SBlock *pBlock = &pCommitter->nBlock; - - if (pIter == NULL) goto _exit; - - tBlockReset(pBlock); - tBlockDataReset(&pCommitter->nBlockData); - while (true) { - pRow = tsdbTbDataIterGet(pIter); - - if (pRow == NULL || tsdbKeyCmprFn(&(TSDBKEY){.ts = pRow->pTSRow->ts, .version = pRow->version}, &eKey) > 0) { - if (pCommitter->nBlockData.nRow == 0) { - break; - } else { - goto _write_block_data; - } - } - - // update schema - if (pTSchema == NULL || pTSchema->version != TSDBROW_SVERSION(pRow)) { - // TODO - // pTSchema = NULL; - } - - // append row - code = tBlockDataAppendRow(&pCommitter->nBlockData, pRow, pTSchema); - if (code) goto _err; - - // update info - key = tsdbRowKey(pRow); - if (tsdbKeyCmprFn(&key, &pBlock->info.maxKey) > 0) pBlock->info.maxKey = key; - if (tsdbKeyCmprFn(&key, &pBlock->info.minKey) < 0) pBlock->info.minKey = key; - if (key.version > pBlock->info.maxVersion) pBlock->info.maxVersion = key.version; - if (key.version < pBlock->info.minVerion) pBlock->info.minVerion = key.version; - - // iter next - tsdbTbDataIterNext(pIter); - - // check write - if (pCommitter->nBlockData.nRow < pCommitter->maxRow * 4 / 5) { - continue; - } - - _write_block_data: - if (!toDataOnly && pCommitter->nBlockData.nRow < pCommitter->minKey) { - pCommitter->nBlock.last = 1; - } else { - pCommitter->nBlock.last = 0; - } - - code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, pBlock); - if (code) goto _err; - - code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock); - if (code) goto _err; - - // update info - if (tsdbKeyCmprFn(&pBlock->info.minKey, &pBlockIdx->info.minKey) < 0) pBlock->info.minKey = pBlockIdx->info.minKey; - if (tsdbKeyCmprFn(&pBlock->info.maxKey, &pBlockIdx->info.maxKey) < 0) pBlock->info.maxKey = pBlockIdx->info.maxKey; - if (pBlock->info.minVerion < pBlockIdx->info.minVerion) pBlockIdx->info.minVerion = pBlock->info.minVerion; - if (pBlock->info.maxVersion < pBlockIdx->info.maxVersion) pBlockIdx->info.maxVersion = pBlock->info.maxVersion; - - tBlockReset(pBlock); - tBlockDataReset(&pCommitter->nBlockData); - } - -_exit: - return code; - -_err: - tsdbError("vgId:%d commit memory data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbGetOverlapRowNumber(STbDataIter *pIter, SBlock *pBlock) { - int32_t nRow = 0; - TSDBROW *pRow; - TSDBKEY key; - int32_t c = 0; - STbDataIter iter = *pIter; - - iter.pRow = NULL; - while (true) { - pRow = tsdbTbDataIterGet(pIter); - - if (pRow == NULL) break; - key = tsdbRowKey(pRow); - - c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock); - if (c == 0) { - nRow++; - } else if (c > 0) { - break; - } else { - ASSERT(0); - } - } - - return nRow; -} - -static int32_t tsdbMergeCommitImpl(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock, - int8_t toDataOnly) { - int32_t code = 0; - int32_t iRow = 0; - int32_t nRow = 0; - int32_t c; - TSDBROW *pRow; - SBlock block = tBlockInit(); - TSDBKEY key1; - TSDBKEY key2; - - tBlockDataReset(&pCommitter->nBlockData); - - // load last and merge until {pCommitter->maxKey, INT64_MAX} - code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, 0, NULL, NULL); - if (code) goto _err; +// static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, TSDBKEY eKey, +// bool toDataOnly) { +// int32_t code = 0; +// TSDBROW *pRow; +// STSchema *pTSchema = NULL; // TODO +// TSDBKEY key; +// SBlock *pBlock = &pCommitter->nBlock; + +// if (pIter == NULL) goto _exit; + +// tBlockReset(pBlock); +// tBlockDataReset(&pCommitter->nBlockData); +// while (true) { +// pRow = tsdbTbDataIterGet(pIter); + +// if (pRow == NULL || tsdbKeyCmprFn(&(TSDBKEY){.ts = pRow->pTSRow->ts, .version = pRow->version}, &eKey) > 0) { +// if (pCommitter->nBlockData.nRow == 0) { +// break; +// } else { +// goto _write_block_data; +// } +// } + +// // update schema +// if (pTSchema == NULL || pTSchema->version != TSDBROW_SVERSION(pRow)) { +// // TODO +// // pTSchema = NULL; +// } + +// // append row +// code = tBlockDataAppendRow(&pCommitter->nBlockData, pRow, pTSchema); +// if (code) goto _err; + +// // update info +// key = tsdbRowKey(pRow); +// if (tsdbKeyCmprFn(&key, &pBlock->info.maxKey) > 0) pBlock->info.maxKey = key; +// if (tsdbKeyCmprFn(&key, &pBlock->info.minKey) < 0) pBlock->info.minKey = key; +// if (key.version > pBlock->info.maxVersion) pBlock->info.maxVersion = key.version; +// if (key.version < pBlock->info.minVerion) pBlock->info.minVerion = key.version; + +// // iter next +// tsdbTbDataIterNext(pIter); + +// // check write +// if (pCommitter->nBlockData.nRow < pCommitter->maxRow * 4 / 5) { +// continue; +// } + +// _write_block_data: +// if (!toDataOnly && pCommitter->nBlockData.nRow < pCommitter->minKey) { +// pCommitter->nBlock.last = 1; +// } else { +// pCommitter->nBlock.last = 0; +// } + +// code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, pBlock); +// if (code) goto _err; + +// code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock); +// if (code) goto _err; + +// // update info +// if (tsdbKeyCmprFn(&pBlock->info.minKey, &pBlockIdx->info.minKey) < 0) pBlock->info.minKey = +// pBlockIdx->info.minKey; if (tsdbKeyCmprFn(&pBlock->info.maxKey, &pBlockIdx->info.maxKey) < 0) pBlock->info.maxKey +// = pBlockIdx->info.maxKey; if (pBlock->info.minVerion < pBlockIdx->info.minVerion) pBlockIdx->info.minVerion = +// pBlock->info.minVerion; if (pBlock->info.maxVersion < pBlockIdx->info.maxVersion) pBlockIdx->info.maxVersion = +// pBlock->info.maxVersion; + +// tBlockReset(pBlock); +// tBlockDataReset(&pCommitter->nBlockData); +// } + +// _exit: +// return code; + +// _err: +// tsdbError("vgId:%d commit memory data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); +// return code; +// } + +// static int32_t tsdbGetOverlapRowNumber(STbDataIter *pIter, SBlock *pBlock) { +// int32_t nRow = 0; +// TSDBROW *pRow; +// TSDBKEY key; +// int32_t c = 0; +// STbDataIter iter = *pIter; + +// iter.pRow = NULL; +// while (true) { +// pRow = tsdbTbDataIterGet(pIter); + +// if (pRow == NULL) break; +// key = tsdbRowKey(pRow); + +// c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock); +// if (c == 0) { +// nRow++; +// } else if (c > 0) { +// break; +// } else { +// ASSERT(0); +// } +// } + +// return nRow; +// } + +// static int32_t tsdbMergeCommitImpl(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock, +// int8_t toDataOnly) { +// int32_t code = 0; +// int32_t iRow = 0; +// int32_t nRow = 0; +// int32_t c; +// TSDBROW *pRow; +// SBlock block = tBlockInit(); +// TSDBKEY key1; +// TSDBKEY key2; + +// tBlockDataReset(&pCommitter->nBlockData); + +// // load last and merge until {pCommitter->maxKey, INT64_MAX} +// code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, 0, NULL, NULL); +// if (code) goto _err; + +// iRow = 0; +// nRow = pCommitter->oBlockData.nRow; +// pRow = tsdbTbDataIterGet(pIter); + +// while (true) { +// if ((pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) && (iRow >= nRow)) { +// if (pCommitter->nBlockData.nRow > 0) { +// goto _write_block_data; +// } else { +// break; +// } +// } + +// // TODO + +// _write_block_data: +// block.last = pCommitter->nBlockData.nRow < pCommitter->minRow ? 1 : 0; +// code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, &block); +// if (code) goto _err; + +// code = tMapDataPutItem(&pCommitter->nBlockMap, &block, tPutBlock); +// if (code) goto _err; +// } + +// tBlockReset(&block); +// tBlockDataReset(&pCommitter->nBlockData); + +// return code; + +// _err: +// tsdbError("vgId:%d merge commit impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); +// return code; +// } + +// static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock, +// int8_t isLastBlock) { +// int32_t code = 0; +// TSDBROW *pRow; +// TSDBKEY key; +// int32_t c; + +// if (pBlock == NULL) { // (pIter && pBlock == NULL) +// key.ts = pCommitter->maxKey; +// key.version = INT64_MAX; +// code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 0); +// if (code) goto _err; +// } else if (pBlock->last) { +// // merge +// code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 0); +// if (code) goto _err; +// } else { // pBlock && pBlock->last == 0 && (pIter == NULL || pIter) +// // memory +// if (pIter) { +// key.ts = pBlock->info.minKey.ts; +// key.version = pBlock->info.minKey.version - 1; +// code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 1); +// if (code) goto _err; +// } + +// // merge or move block +// pRow = tsdbTbDataIterGet(pIter); +// key.ts = pRow->pTSRow->ts; +// key.version = pRow->version; + +// c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock); +// if (c > 0) { +// // move block +// code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock); +// if (code) goto _err; +// } else if (c == 0) { +// int32_t nOverlap = tsdbGetOverlapRowNumber(pIter, pBlock); + +// if (pBlock->nRow + nOverlap > pCommitter->maxRow || pBlock->nSubBlock == TSDB_MAX_SUBBLOCKS) { +// code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 1); +// if (code) goto _err; +// } else { +// // add as a subblock +// } +// } else { +// ASSERT(0); +// } +// } + +// return code; + +// _err: +// tsdbError("vgId:%d merge commit failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); +// return code; +// } + +// static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) { +// int32_t code = 0; +// STbDataIter iter; +// STbDataIter *pIter = &iter; +// TSDBROW *pRow; +// int64_t suid; +// int64_t uid; +// SBlockIdx blockIdx; + +// // create iter +// if (pTbData) { +// suid = pTbData->suid; +// uid = pTbData->uid; +// tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, pIter); +// } else { +// suid = pBlockIdx->suid; +// uid = pBlockIdx->uid; +// pIter = NULL; +// } + +// // check +// pRow = tsdbTbDataIterGet(pIter); +// if (ROW_END(pRow, pCommitter->maxKey) && pBlockIdx == NULL) goto _exit; + +// // start ================================ +// tMapDataReset(&pCommitter->oBlockMap); +// tBlockReset(&pCommitter->oBlock); +// tBlockDataReset(&pCommitter->oBlockData); +// if (pBlockIdx) { +// code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL); +// if (code) goto _err; +// } + +// blockIdx = tBlockIdxInit(suid, uid); +// tMapDataReset(&pCommitter->nBlockMap); +// tBlockReset(&pCommitter->nBlock); +// tBlockDataReset(&pCommitter->nBlockData); + +// // impl =============================== +// int32_t iBlock = 0; +// int32_t nBlock = pCommitter->oBlockMap.nItem; + +// // merge +// pRow = tsdbTbDataIterGet(pIter); +// while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlock) { +// tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, &pCommitter->oBlock, tGetBlock); +// code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, &pCommitter->oBlock, iBlock == (nBlock - 1)); +// if (code) goto _err; + +// pRow = tsdbTbDataIterGet(pIter); +// iBlock++; +// } + +// // mem +// pRow = tsdbTbDataIterGet(pIter); +// while (!ROW_END(pRow, pCommitter->maxKey)) { +// code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, NULL, 0); +// if (code) goto _err; + +// pRow = tsdbTbDataIterGet(pIter); +// } + +// // disk +// while (iBlock < nBlock) { +// tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, &pCommitter->oBlock, tGetBlock); + +// code = tsdbMergeCommit(pCommitter, &blockIdx, NULL, &pCommitter->oBlock, 0); +// if (code) goto _err; + +// iBlock++; +// } + +// // end =============================== +// code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, &blockIdx); +// if (code) goto _err; + +// code = tMapDataPutItem(&pCommitter->nBlockIdxMap, &blockIdx, tPutBlockIdx); +// if (code) goto _err; + +// _exit: +// pRow = tsdbTbDataIterGet(pIter); +// if (pRow) { +// ASSERT(pRow->pTSRow->ts > pCommitter->maxKey); +// if (pCommitter->nextKey > pRow->pTSRow->ts) { +// pCommitter->nextKey = pRow->pTSRow->ts; +// } +// } + +// return code; + +// _err: +// tsdbError("vgId:%d commit Table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); +// return code; +// } - iRow = 0; - nRow = pCommitter->oBlockData.nRow; - pRow = tsdbTbDataIterGet(pIter); - - while (true) { - if ((pRow == NULL || pRow->pTSRow->ts > pCommitter->maxKey) && (iRow >= nRow)) { - if (pCommitter->nBlockData.nRow > 0) { - goto _write_block_data; - } else { - break; - } - } +static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { + int32_t code = 0; + STsdb *pTsdb = pCommitter->pTsdb; + SDFileSet *pRSet = NULL; + SDFileSet wSet; - // TODO + // memory + pCommitter->nextKey = TSKEY_MAX; - _write_block_data: - block.last = pCommitter->nBlockData.nRow < pCommitter->minRow ? 1 : 0; - code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->nBlockData, NULL, NULL, pBlockIdx, &block); + // old + tMapDataReset(&pCommitter->oBlockIdxMap); + tMapDataReset(&pCommitter->oBlockMap); + tBlockReset(&pCommitter->oBlock); + tBlockDataReset(&pCommitter->oBlockData); + pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid); + if (pRSet) { + code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet); if (code) goto _err; - code = tMapDataPutItem(&pCommitter->nBlockMap, &block, tPutBlock); + code = tsdbReadBlockIdx(pCommitter->pReader, &pCommitter->oBlockIdxMap, NULL); if (code) goto _err; } - tBlockReset(&block); + // new + tMapDataReset(&pCommitter->nBlockIdxMap); + tMapDataReset(&pCommitter->nBlockMap); + tBlockReset(&pCommitter->nBlock); tBlockDataReset(&pCommitter->nBlockData); + if (pRSet) { + wSet = (SDFileSet){.diskId = pRSet->diskId, + .fid = pCommitter->commitFid, + .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0}, + .fData = pRSet->fData, + .fLast = {.commitID = pCommitter->commitID, .size = 0}, + .fSma = pRSet->fSma}; + } else { + STfs *pTfs = pTsdb->pVnode->pTfs; + SDiskID did = {.level = 0, .id = 0}; - return code; - -_err: - tsdbError("vgId:%d merge commit impl failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock, - int8_t isLastBlock) { - int32_t code = 0; - TSDBROW *pRow; - TSDBKEY key; - int32_t c; - - if (pBlock == NULL) { // (pIter && pBlock == NULL) - key.ts = pCommitter->maxKey; - key.version = INT64_MAX; - code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 0); - if (code) goto _err; - } else if (pBlock->last) { - // merge - code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 0); - if (code) goto _err; - } else { // pBlock && pBlock->last == 0 && (pIter == NULL || pIter) - // memory - if (pIter) { - key.ts = pBlock->info.minKey.ts; - key.version = pBlock->info.minKey.version - 1; - code = tsdbCommitMemoryData(pCommitter, pBlockIdx, pIter, key, 1); - if (code) goto _err; - } - - // merge or move block - pRow = tsdbTbDataIterGet(pIter); - key.ts = pRow->pTSRow->ts; - key.version = pRow->version; + // TODO: alloc a new disk + // tfsAllocDisk(pTfs, 0, &did); - c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock); - if (c > 0) { - // move block - code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock); - if (code) goto _err; - } else if (c == 0) { - int32_t nOverlap = tsdbGetOverlapRowNumber(pIter, pBlock); + // create the directory + tfsMkdirRecurAt(pTfs, pTsdb->path, did); - if (pBlock->nRow + nOverlap > pCommitter->maxRow || pBlock->nSubBlock == TSDB_MAX_SUBBLOCKS) { - code = tsdbMergeCommitImpl(pCommitter, pBlockIdx, pIter, pBlock, 1); - if (code) goto _err; - } else { - // add as a subblock - } - } else { - ASSERT(0); - } + wSet = (SDFileSet){.diskId = did, + .fid = pCommitter->commitFid, + .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0}, + .fData = {.commitID = pCommitter->commitID, .size = 0}, + .fLast = {.commitID = pCommitter->commitID, .size = 0}, + .fSma = {.commitID = pCommitter->commitID, .size = 0}}; } + code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, &wSet); + if (code) goto _err; +_exit: return code; _err: - tsdbError("vgId:%d merge commit failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } -static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) { +static int32_t tsdbCommitMemoryData(SCommitter *pCommitter, STbData *pTbData) { int32_t code = 0; - STbDataIter iter; - STbDataIter *pIter = &iter; + STsdb *pTsdb = pCommitter->pTsdb; + STbDataIter *pIter = &(STbDataIter){0}; + TSDBKEY key = {.ts = pCommitter->minKey, .version = VERSION_MIN}; TSDBROW *pRow; - int64_t suid; - int64_t uid; - SBlockIdx blockIdx; // create iter - if (pTbData) { - suid = pTbData->suid; - uid = pTbData->uid; - tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, pIter); - } else { - suid = pBlockIdx->suid; - uid = pBlockIdx->uid; - pIter = NULL; - } - - // check + tsdbTbDataIterOpen(pTbData, &key, 0, pIter); pRow = tsdbTbDataIterGet(pIter); - if (ROW_END(pRow, pCommitter->maxKey) && pBlockIdx == NULL) goto _exit; - // start ================================ - tMapDataReset(&pCommitter->oBlockMap); - tBlockReset(&pCommitter->oBlock); - tBlockDataReset(&pCommitter->oBlockData); - if (pBlockIdx) { - code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL); - if (code) goto _err; - } - - blockIdx = tBlockIdxInit(suid, uid); - tMapDataReset(&pCommitter->nBlockMap); - tBlockReset(&pCommitter->nBlock); - tBlockDataReset(&pCommitter->nBlockData); + if (pRow == NULL || tsdbRowKey(pRow).ts > pCommitter->maxKey) goto _exit; - // impl =============================== - int32_t iBlock = 0; - int32_t nBlock = pCommitter->oBlockMap.nItem; - - // merge - pRow = tsdbTbDataIterGet(pIter); - while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlock) { - tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, &pCommitter->oBlock, tGetBlock); - code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, &pCommitter->oBlock, iBlock == (nBlock - 1)); - if (code) goto _err; - - pRow = tsdbTbDataIterGet(pIter); - iBlock++; - } + // main loop + SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pTbData->suid, .uid = pTbData->uid}; + SBlock *pBlock = &pCommitter->nBlock; + SBlockData *pBlockData = &pCommitter->nBlockData; - // mem - pRow = tsdbTbDataIterGet(pIter); - while (!ROW_END(pRow, pCommitter->maxKey)) { - code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, NULL, 0); + tBlockIdxReset(pBlockIdx); + tBlockReset(pBlock); + tBlockDataReset(pBlockData); + while (pRow != NULL && tsdbRowKey(pRow).ts <= pCommitter->maxKey) { + code = tBlockDataAppendRow(pBlockData, pRow, NULL); if (code) goto _err; + tsdbTbDataIterNext(pIter); pRow = tsdbTbDataIterGet(pIter); - } - // disk - while (iBlock < nBlock) { - tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, &pCommitter->oBlock, tGetBlock); - - code = tsdbMergeCommit(pCommitter, &blockIdx, NULL, &pCommitter->oBlock, 0); - if (code) goto _err; - - iBlock++; + if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { + // write the block and do something + } } - // end =============================== - code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, &blockIdx); - if (code) goto _err; - - code = tMapDataPutItem(&pCommitter->nBlockIdxMap, &blockIdx, tPutBlockIdx); - if (code) goto _err; - _exit: - pRow = tsdbTbDataIterGet(pIter); if (pRow) { - ASSERT(pRow->pTSRow->ts > pCommitter->maxKey); - if (pCommitter->nextKey > pRow->pTSRow->ts) { - pCommitter->nextKey = pRow->pTSRow->ts; - } + pCommitter->nextKey = TMIN(pCommitter->nextKey, tsdbRowKey(pRow).ts); } - return code; _err: - tsdbError("vgId:%d commit Table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { - int32_t code = 0; - STsdb *pTsdb = pCommitter->pTsdb; - SDFileSet *pRSet = NULL; - SDFileSet *pWSet = NULL; - - // memory - pCommitter->nextKey = TSKEY_MAX; - - // old - tMapDataReset(&pCommitter->oBlockIdxMap); - tMapDataReset(&pCommitter->oBlockMap); - tBlockReset(&pCommitter->oBlock); - tBlockDataReset(&pCommitter->oBlockData); - pRSet = tsdbFSStateGetDFileSet(pTsdb->fs->nState, pCommitter->commitFid); - if (pRSet) { - code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet); - if (code) goto _err; - - code = tsdbReadBlockIdx(pCommitter->pReader, &pCommitter->oBlockIdxMap, NULL); - if (code) goto _err; - } - - // new - tMapDataReset(&pCommitter->nBlockIdxMap); - tMapDataReset(&pCommitter->nBlockMap); - tBlockReset(&pCommitter->nBlock); - tBlockDataReset(&pCommitter->nBlockData); - if (pRSet) { - pWSet = &(SDFileSet){.diskId = pRSet->diskId, - .fid = pCommitter->commitFid, - .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0}, - .fData = pRSet->fData, - .fLast = {.commitID = pCommitter->commitID, .size = 0}, - .fSma = pRSet->fSma}; - } else { - SDiskID did = {.level = 0, .id = 0}; // TODO: alloc a new one - pWSet = &(SDFileSet){.diskId = did, - .fid = pCommitter->commitFid, - .fHead = {.commitID = pCommitter->commitID, .offset = 0, .size = 0}, - .fData = {.commitID = pCommitter->commitID, .size = 0}, - .fLast = {.commitID = pCommitter->commitID, .size = 0}, - .fSma = {.commitID = pCommitter->commitID, .size = 0}}; - } - code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, pWSet); - if (code) goto _err; - -_exit: - return code; - -_err: - tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d tsdb commit memory data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } @@ -711,8 +764,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { int32_t iBlockIdx = 0; int32_t nBlockIdx = pCommitter->oBlockIdxMap.nItem; STbData *pTbData; - SBlockIdx blockIdx; - SBlockIdx *pBlockIdx = &blockIdx; + SBlockIdx *pBlockIdx = &(SBlockIdx){0}; ASSERT(nTbData > 0); @@ -723,6 +775,81 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { pBlockIdx = NULL; } + // merge + while (pTbData && pBlockIdx) { + c = tTABLEIDCmprFn(pTbData, pBlockIdx); + + if (c == 0) { + // merge commit + // code = tsdbMergeCommit(pCommitter, pTbData, pBlockIdx); + // if (code) goto _err; + + iTbData++; + iBlockIdx++; + if (iTbData < nTbData) { + pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + } else { + pTbData = NULL; + } + if (iBlockIdx < nBlockIdx) { + tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx); + } else { + pBlockIdx = NULL; + } + } else if (c < 0) { + // commit memory data + // code = tsdbCommitMemoryData(pCommitter, pTbData); + // if (code) goto _err; + + iTbData++; + if (iTbData < nTbData) { + pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + } else { + pTbData = NULL; + } + } else { + // commit disk data + // code = tsdbCommitDiskData(pCommitter, pBlockIdx); + // if (code) goto _err; + + iBlockIdx++; + if (iBlockIdx < nBlockIdx) { + tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx); + } else { + pBlockIdx = NULL; + } + } + } + + // disk + while (pBlockIdx) { + // commit disk data + // code = tsdbCommitDiskData(pCommitter, pBlockIdx); + // if (code) goto _err; + + iBlockIdx++; + if (iBlockIdx < nBlockIdx) { + tMapDataGetItemByIdx(&pCommitter->oBlockIdxMap, iBlockIdx, pBlockIdx, tGetBlockIdx); + } else { + pBlockIdx = NULL; + } + } + + // memory + while (pTbData) { + // commit memory data + code = tsdbCommitMemoryData(pCommitter, pTbData); + if (code) goto _err; + + iTbData++; + if (iTbData < nTbData) { + pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); + } else { + pTbData = NULL; + } + } + +#if 0 while (true) { if (pTbData == NULL && pBlockIdx == NULL) break; @@ -784,6 +911,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { } continue; } +#endif return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index d48000231cb89fd80ac1596555d230977141e494..b322a9833370a17e9414eec1d31d3551a148a19f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -607,12 +607,12 @@ int32_t tsdbReadBlockSMA(SDataFReader *pReader, SBlockSMA *pBlkSMA) { // SDataFWriter ==================================================== struct SDataFWriter { - STsdb *pTsdb; - SDFileSet *pSet; - TdFilePtr pHeadFD; - TdFilePtr pDataFD; - TdFilePtr pLastFD; - TdFilePtr pSmaFD; + STsdb *pTsdb; + SDFileSet wSet; + TdFilePtr pHeadFD; + TdFilePtr pDataFD; + TdFilePtr pLastFD; + TdFilePtr pSmaFD; }; int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) { @@ -630,9 +630,8 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS goto _err; } pWriter->pTsdb = pTsdb; - pWriter->pSet = pSet; - - // create the directory if not there + pWriter->wSet = *pSet; + pSet = &pWriter->wSet; // head flag = TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; @@ -809,10 +808,10 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) { int64_t size = TSDB_FHDR_SIZE; int64_t n; uint8_t *pBuf = NULL; - SHeadFile *pHeadFile = &pWriter->pSet->fHead; - SDataFile *pDataFile = &pWriter->pSet->fData; - SLastFile *pLastFile = &pWriter->pSet->fLast; - SSmaFile *pSmaFile = &pWriter->pSet->fSma; + SHeadFile *pHeadFile = &pWriter->wSet.fHead; + SDataFile *pDataFile = &pWriter->wSet.fData; + SLastFile *pLastFile = &pWriter->wSet.fLast; + SSmaFile *pSmaFile = &pWriter->wSet.fSma; // alloc if (!ppBuf) ppBuf = &pBuf; @@ -904,7 +903,7 @@ _err: int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *mBlockIdx, uint8_t **ppBuf) { int32_t code = 0; int64_t size = 0; - SHeadFile *pHeadFile = &pWriter->pSet->fHead; + SHeadFile *pHeadFile = &pWriter->wSet.fHead; int64_t n = 0; uint8_t *pBuf = NULL; @@ -946,7 +945,7 @@ _err: int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf, SBlockIdx *pBlockIdx) { int32_t code = 0; - SHeadFile *pHeadFile = &pWriter->pSet->fHead; + SHeadFile *pHeadFile = &pWriter->wSet.fHead; uint8_t *pBuf = NULL; int64_t size; int64_t n; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 1b93188afd7ab5e89a17071769e5fcc24a0fbf4a..be03ff89ca6a68c7f5037dcdae495d798809ec73 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -277,13 +277,25 @@ static FORCE_INLINE int32_t tGetTSDBKEY(uint8_t *p, TSDBKEY *pKey) { } // SBlockIdx ====================================================== +void tBlockIdxReset(SBlockIdx *pBlockIdx) { + pBlockIdx->minKey = TSKEY_MAX; + pBlockIdx->maxKey = TSKEY_MIN; + pBlockIdx->minVersion = VERSION_MAX; + pBlockIdx->maxVersion = VERSION_MIN; + pBlockIdx->offset = -1; + pBlockIdx->size = -1; +} + int32_t tPutBlockIdx(uint8_t *p, void *ph) { int32_t n = 0; SBlockIdx *pBlockIdx = (SBlockIdx *)ph; n += tPutI64(p ? p + n : p, pBlockIdx->suid); n += tPutI64(p ? p + n : p, pBlockIdx->uid); - n += tPutKEYINFO(p ? p + n : p, &pBlockIdx->info); + n += tPutI64(p ? p + n : p, pBlockIdx->minKey); + n += tPutI64(p ? p + n : p, pBlockIdx->maxKey); + n += tPutI64v(p ? p + n : p, pBlockIdx->minVersion); + n += tPutI64v(p ? p + n : p, pBlockIdx->maxVersion); n += tPutI64v(p ? p + n : p, pBlockIdx->offset); n += tPutI64v(p ? p + n : p, pBlockIdx->size); @@ -296,7 +308,10 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph) { n += tGetI64(p + n, &pBlockIdx->suid); n += tGetI64(p + n, &pBlockIdx->uid); - n += tGetKEYINFO(p + n, &pBlockIdx->info); + n += tGetI64(p + n, &pBlockIdx->minKey); + n += tGetI64(p + n, &pBlockIdx->maxKey); + n += tGetI64v(p + n, &pBlockIdx->minVersion); + n += tGetI64v(p + n, &pBlockIdx->maxVersion); n += tGetI64v(p + n, &pBlockIdx->offset); n += tGetI64v(p + n, &pBlockIdx->size); @@ -305,7 +320,10 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph) { // SBlock ====================================================== void tBlockReset(SBlock *pBlock) { - pBlock->info = tKEYINFOInit(); + pBlock->minKey = TSDBKEY_MAX; + pBlock->maxKey = TSDBKEY_MIN; + pBlock->minVersion = VERSION_MAX; + pBlock->maxVersion = VERSION_MIN; pBlock->nRow = 0; pBlock->last = -1; pBlock->cmprAlg = -1; @@ -328,7 +346,10 @@ int32_t tPutBlock(uint8_t *p, void *ph) { int32_t n = 0; SBlock *pBlock = (SBlock *)ph; - n += tPutKEYINFO(p ? p + n : p, &pBlock->info); + n += tPutTSDBKEY(p ? p + n : p, &pBlock->minKey); + n += tPutTSDBKEY(p ? p + n : p, &pBlock->maxKey); + n += tPutI64v(p ? p + n : p, pBlock->minVersion); + n += tPutI64v(p ? p + n : p, pBlock->maxVersion); n += tPutI32v(p ? p + n : p, pBlock->nRow); n += tPutI8(p ? p + n : p, pBlock->last); n += tPutI8(p ? p + n : p, pBlock->hasDup); @@ -348,7 +369,10 @@ int32_t tGetBlock(uint8_t *p, void *ph) { int32_t n = 0; SBlock *pBlock = (SBlock *)ph; - n += tGetKEYINFO(p + n, &pBlock->info); + n += tGetTSDBKEY(p + n, &pBlock->minKey); + n += tGetTSDBKEY(p + n, &pBlock->maxKey); + n += tGetI64v(p + n, &pBlock->minVersion); + n += tGetI64v(p + n, &pBlock->maxVersion); n += tGetI32v(p + n, &pBlock->nRow); n += tGetI8(p + n, &pBlock->last); n += tGetI8(p + n, &pBlock->hasDup); @@ -369,9 +393,9 @@ int32_t tBlockCmprFn(const void *p1, const void *p2) { SBlock *pBlock1 = (SBlock *)p1; SBlock *pBlock2 = (SBlock *)p2; - if (tsdbKeyCmprFn(&pBlock1->info.maxKey, &pBlock2->info.minKey) < 0) { + if (tsdbKeyCmprFn(&pBlock1->maxKey, &pBlock2->minKey) < 0) { return -1; - } else if (tsdbKeyCmprFn(&pBlock1->info.minKey, &pBlock2->info.maxKey) > 0) { + } else if (tsdbKeyCmprFn(&pBlock1->minKey, &pBlock2->maxKey) > 0) { return 1; }