From 7d3a57f568b5ee13af5c8de87fb16f8a1253ed6c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 8 Jan 2022 06:50:52 +0000 Subject: [PATCH] more --- include/dnode/vnode/tsdb/tsdb.h | 1 + source/dnode/vnode/tsdb/src/tsdbCommit.c | 1674 +++++++++++----------- 2 files changed, 828 insertions(+), 847 deletions(-) diff --git a/include/dnode/vnode/tsdb/tsdb.h b/include/dnode/vnode/tsdb/tsdb.h index c19152de44..6751091f24 100644 --- a/include/dnode/vnode/tsdb/tsdb.h +++ b/include/dnode/vnode/tsdb/tsdb.h @@ -54,6 +54,7 @@ typedef struct STsdbCfg { int32_t keep1; int32_t keep2; int8_t update; + int8_t compression; } STsdbCfg; // STsdb diff --git a/source/dnode/vnode/tsdb/src/tsdbCommit.c b/source/dnode/vnode/tsdb/src/tsdbCommit.c index c16d10414e..81128ea9f3 100644 --- a/source/dnode/vnode/tsdb/src/tsdbCommit.c +++ b/source/dnode/vnode/tsdb/src/tsdbCommit.c @@ -62,6 +62,37 @@ static void tsdbDestroyCommitH(SCommitH *pCommith); static int tsdbCreateCommitIters(SCommitH *pCommith); static void tsdbDestroyCommitIters(SCommitH *pCommith); static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); +static void tsdbResetCommitFile(SCommitH *pCommith); +static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid); +// static int tsdbCommitMeta(STsdbRepo *pRepo); +// static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact); +// static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid); +// static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile); +// static void tsdbStartCommit(STsdbRepo *pRepo); +// static void tsdbEndCommit(STsdbRepo *pRepo, int eno); +// static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); +// static int tsdbCreateCommitIters(SCommitH *pCommith); +// static void tsdbDestroyCommitIters(SCommitH *pCommith); +// static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key); +// static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo); +// static void tsdbDestroyCommitH(SCommitH *pCommith); +// static int tsdbGetFidLevel(int fid, SRtn *pRtn); +// static int tsdbNextCommitFid(SCommitH *pCommith); +static int tsdbCommitToTable(SCommitH *pCommith, int tid); +static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); +static int tsdbComparKeyBlock(const void *arg1, const void *arg2); +// static int tsdbWriteBlockInfo(SCommitH *pCommih); +// static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); +// static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx); +static int tsdbMoveBlock(SCommitH *pCommith, int bidx); +static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks); +static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, + bool isLastOneBlock); +static void tsdbResetCommitTable(SCommitH *pCommith); +static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); +// static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); +// static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, +// TSKEY maxKey, int maxRows, int8_t update); int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { SDiskID did; @@ -141,7 +172,6 @@ int tsdbCommit(STsdb *pRepo) { // Loop to commit to each file fid = tsdbNextCommitFid(&(commith)); -#if 0 while (true) { // Loop over both on disk and memory if (pSet == NULL && fid == TSDB_IVLD_FID) break; @@ -179,7 +209,6 @@ int tsdbCommit(STsdb *pRepo) { fid = tsdbNextCommitFid(&commith); } } -#endif tsdbDestroyCommitH(&commith); tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS); @@ -314,61 +343,62 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) { tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); } -// static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { -// STsdb * pRepo = TSDB_COMMIT_REPO(pCommith); -// STsdbCfg *pCfg = REPO_CFG(pRepo); +static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { + STsdb * pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg *pCfg = REPO_CFG(pRepo); -// ASSERT(pSet == NULL || pSet->fid == fid); + ASSERT(pSet == NULL || pSet->fid == fid); -// tsdbResetCommitFile(pCommith); -// tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey)); + tsdbResetCommitFile(pCommith); + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey)); -// // Set and open files -// if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) { -// return -1; -// } + // Set and open files + if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) { + return -1; + } -// // Loop to commit each table data -// for (int tid = 1; tid < pCommith->niters; tid++) { -// SCommitIter *pIter = pCommith->iters + tid; + // Loop to commit each table data + for (int tid = 1; tid < pCommith->niters; tid++) { + SCommitIter *pIter = pCommith->iters + tid; -// if (pIter->pTable == NULL) continue; + if (pIter->pTable == NULL) continue; -// if (tsdbCommitToTable(pCommith, tid) < 0) { -// tsdbCloseCommitFile(pCommith, true); -// // revert the file change -// tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); -// return -1; -// } -// } + if (tsdbCommitToTable(pCommith, tid) < 0) { + tsdbCloseCommitFile(pCommith, true); + // revert the file change + tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); + return -1; + } + } +#if 0 -// if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) -// < -// 0) { -// tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); -// tsdbCloseCommitFile(pCommith, true); -// // revert the file change -// tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); -// return -1; -// } + if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) < + 0) { + tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + tsdbCloseCommitFile(pCommith, true); + // revert the file change + tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); + return -1; + } -// if (tsdbUpdateDFileSetHeader(&(pCommith->wSet)) < 0) { -// tsdbError("vgId:%d failed to update FSET %d header since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); -// tsdbCloseCommitFile(pCommith, true); -// // revert the file change -// tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); -// return -1; -// } + if (tsdbUpdateDFileSetHeader(&(pCommith->wSet)) < 0) { + tsdbError("vgId:%d failed to update FSET %d header since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + tsdbCloseCommitFile(pCommith, true); + // revert the file change + tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); + return -1; + } -// // Close commit file -// tsdbCloseCommitFile(pCommith, false); + // Close commit file + tsdbCloseCommitFile(pCommith, false); -// if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) { -// return -1; -// } + if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) { + return -1; + } -// return 0; -// } +#endif + return 0; +} static int tsdbCreateCommitIters(SCommitH *pCommith) { STsdb * pRepo = TSDB_COMMIT_REPO(pCommith); @@ -416,479 +446,569 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) { pCommith->niters = 0; } -#if 0 -#include "tsdbint.h" +static void tsdbResetCommitFile(SCommitH *pCommith) { + pCommith->isRFileSet = false; + pCommith->isDFileSame = false; + pCommith->isLFileSame = false; + taosArrayClear(pCommith->aBlkIdx); +} -extern int32_t tsTsdbMetaCompactRatio; +static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { + SDiskID did; + STsdb * pRepo = TSDB_COMMIT_REPO(pCommith); + SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith); + tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &(did.level), &(did.id)); + if (did.level == TFS_UNDECIDED_LEVEL) { + terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; + return -1; + } -static int tsdbCommitMeta(STsdbRepo *pRepo); -static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact); -static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid); -static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile); -static void tsdbStartCommit(STsdbRepo *pRepo); -static void tsdbEndCommit(STsdbRepo *pRepo, int eno); -static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); -static int tsdbCreateCommitIters(SCommitH *pCommith); -static void tsdbDestroyCommitIters(SCommitH *pCommith); -static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key); -static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo); -static void tsdbDestroyCommitH(SCommitH *pCommith); -static int tsdbGetFidLevel(int fid, SRtn *pRtn); -static int tsdbNextCommitFid(SCommitH *pCommith); -static int tsdbCommitToTable(SCommitH *pCommith, int tid); -static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); -static int tsdbComparKeyBlock(const void *arg1, const void *arg2); -static int tsdbWriteBlockInfo(SCommitH *pCommih); -static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); -static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx); -static int tsdbMoveBlock(SCommitH *pCommith, int bidx); -static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks); -static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, - bool isLastOneBlock); -static void tsdbResetCommitFile(SCommitH *pCommith); -static void tsdbResetCommitTable(SCommitH *pCommith); -static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid); -static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); -static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); -static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, - TSKEY maxKey, int maxRows, int8_t update); + // Open read FSET + if (pSet) { + if (tsdbSetAndOpenReadFSet(&(pCommith->readh), pSet) < 0) { + return -1; + } + pCommith->isRFileSet = true; -int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, - SBlockIdx *pIdx) { - size_t nSupBlocks; - size_t nSubBlocks; - uint32_t tlen; - SBlockInfo *pBlkInfo; - int64_t offset; - SBlock * pBlock; + if (tsdbLoadBlockIdx(&(pCommith->readh)) < 0) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } - memset(pIdx, 0, sizeof(*pIdx)); + tsdbDebug("vgId:%d FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo), TSDB_FSET_FID(pSet), + TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); + } else { + pCommith->isRFileSet = false; + } - nSupBlocks = taosArrayGetSize(pSupA); - nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA); + // Set and open commit FSET + if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) { + // Create a new FSET to write data + tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo))); - if (nSupBlocks <= 0) { - // No data (data all deleted) - return 0; - } + if (tsdbCreateDFileSet(pWSet, true) < 0) { + tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo), + TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet), tstrerror(terrno)); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + } + return -1; + } - tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM)); - if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; - pBlkInfo = *ppBuf; + pCommith->isDFileSame = false; + pCommith->isLFileSame = false; - pBlkInfo->delimiter = TSDB_FILE_DELIMITER; - pBlkInfo->tid = TABLE_TID(pTable); - pBlkInfo->uid = TABLE_UID(pTable); + tsdbDebug("vgId:%d FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet), + TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet)); + } else { + did.level = TSDB_FSET_LEVEL(pSet); + did.id = TSDB_FSET_ID(pSet); - memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock)); - if (nSubBlocks > 0) { - memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock)); + pCommith->wSet.fid = fid; + pCommith->wSet.state = 0; - for (int i = 0; i < nSupBlocks; i++) { - pBlock = pBlkInfo->blocks + i; + // TSDB_FILE_HEAD + SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith); + tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD); + if (tsdbCreateDFile(pWHeadf, true) < 0) { + tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf), + tstrerror(terrno)); - if (pBlock->numOfSubBlocks > 1) { - pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; } } - } - taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen); + // TSDB_FILE_DATA + SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh)); + SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith); + tsdbInitDFileEx(pWDataf, pRDataf); + if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) { + tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWDataf), + tstrerror(terrno)); - if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) { - return -1; - } + tsdbCloseDFileSet(pWSet); + tsdbRemoveDFile(pWHeadf); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } + pCommith->isDFileSame = true; + + // TSDB_FILE_LAST + SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh)); + SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith); + if (pRLastf->info.size < 32 * 1024) { + tsdbInitDFileEx(pWLastf, pRLastf); + pCommith->isLFileSame = true; + + if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) { + tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf), + tstrerror(terrno)); - tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM))); + tsdbCloseDFileSet(pWSet); + tsdbRemoveDFile(pWHeadf); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } + } else { + tsdbInitDFile(pWLastf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST); + pCommith->isLFileSame = false; - // Set pIdx - pBlock = taosArrayGetLast(pSupA); + if (tsdbCreateDFile(pWLastf, true) < 0) { + tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf), + tstrerror(terrno)); - pIdx->tid = TABLE_TID(pTable); - pIdx->uid = TABLE_UID(pTable); - pIdx->hasLast = pBlock->last ? 1 : 0; - pIdx->maxKey = pBlock->keyLast; - pIdx->numOfBlocks = (uint32_t)nSupBlocks; - pIdx->len = tlen; - pIdx->offset = (uint32_t)offset; + tsdbCloseDFileSet(pWSet); + (void)tsdbRemoveDFile(pWHeadf); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } + } + } return 0; } -int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { - SBlockIdx *pBlkIdx; - size_t nidx = taosArrayGetSize(pIdxA); - int tlen = 0, size; - int64_t offset; +// extern int32_t tsTsdbMetaCompactRatio; - if (nidx <= 0) { - // All data are deleted - pHeadf->info.offset = 0; - pHeadf->info.len = 0; - return 0; - } +// int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, +// SBlockIdx *pIdx) { +// size_t nSupBlocks; +// size_t nSubBlocks; +// uint32_t tlen; +// SBlockInfo *pBlkInfo; +// int64_t offset; +// SBlock * pBlock; - for (size_t i = 0; i < nidx; i++) { - pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i); +// memset(pIdx, 0, sizeof(*pIdx)); - size = tsdbEncodeSBlockIdx(NULL, pBlkIdx); - if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1; +// nSupBlocks = taosArrayGetSize(pSupA); +// nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA); - void *ptr = POINTER_SHIFT(*ppBuf, tlen); - tsdbEncodeSBlockIdx(&ptr, pBlkIdx); +// if (nSupBlocks <= 0) { +// // No data (data all deleted) +// return 0; +// } - tlen += size; - } +// tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM)); +// if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; +// pBlkInfo = *ppBuf; - tlen += sizeof(TSCKSUM); - if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; - taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen); +// pBlkInfo->delimiter = TSDB_FILE_DELIMITER; +// pBlkInfo->tid = TABLE_TID(pTable); +// pBlkInfo->uid = TABLE_UID(pTable); - if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) { - return -1; - } +// memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock)); +// if (nSubBlocks > 0) { +// memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock)); - tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM))); - pHeadf->info.offset = (uint32_t)offset; - pHeadf->info.len = tlen; +// for (int i = 0; i < nSupBlocks; i++) { +// pBlock = pBlkInfo->blocks + i; - return 0; -} +// if (pBlock->numOfSubBlocks > 1) { +// pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks); +// } +// } +// } +// taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen); -// =================== Commit Meta Data -static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile* pMf, bool open) { - STsdbFS * pfs = REPO_FS(pRepo); - SMFile * pOMFile = pfs->cstatus->pmf; - SDiskID did; +// if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) { +// return -1; +// } - // Create/Open a meta file or open the existing file - if (pOMFile == NULL) { - // Create a new meta file - did.level = TFS_PRIMARY_LEVEL; - did.id = TFS_PRIMARY_ID; - tsdbInitMFile(pMf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); +// tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM))); - if (open && tsdbCreateMFile(pMf, true) < 0) { - tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; - } +// // Set pIdx +// pBlock = taosArrayGetLast(pSupA); - tsdbInfo("vgId:%d meta file %s is created to commit", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMf)); - } else { - tsdbInitMFileEx(pMf, pOMFile); - if (open && tsdbOpenMFile(pMf, O_WRONLY) < 0) { - tsdbError("vgId:%d failed to open META file since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; - } - } +// pIdx->tid = TABLE_TID(pTable); +// pIdx->uid = TABLE_UID(pTable); +// pIdx->hasLast = pBlock->last ? 1 : 0; +// pIdx->maxKey = pBlock->keyLast; +// pIdx->numOfBlocks = (uint32_t)nSupBlocks; +// pIdx->len = tlen; +// pIdx->offset = (uint32_t)offset; - return 0; -} +// return 0; +// } -static int tsdbCommitMeta(STsdbRepo *pRepo) { - STsdbFS * pfs = REPO_FS(pRepo); - SMemTable *pMem = pRepo->imem; - SMFile * pOMFile = pfs->cstatus->pmf; - SMFile mf; - SActObj * pAct = NULL; - SActCont * pCont = NULL; - SListNode *pNode = NULL; - - ASSERT(pOMFile != NULL || listNEles(pMem->actList) > 0); - - if (listNEles(pMem->actList) <= 0) { - // no meta data to commit, just keep the old meta file - tsdbUpdateMFile(pfs, pOMFile); - if (tsTsdbMetaCompactRatio > 0) { - if (tsdbInitCommitMetaFile(pRepo, &mf, false) < 0) { - return -1; - } - int ret = tsdbCompactMetaFile(pRepo, pfs, &mf); - if (ret < 0) tsdbError("compact meta file error"); +// int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { +// SBlockIdx *pBlkIdx; +// size_t nidx = taosArrayGetSize(pIdxA); +// int tlen = 0, size; +// int64_t offset; + +// if (nidx <= 0) { +// // All data are deleted +// pHeadf->info.offset = 0; +// pHeadf->info.len = 0; +// return 0; +// } - return ret; - } - return 0; - } else { - if (tsdbInitCommitMetaFile(pRepo, &mf, true) < 0) { - return -1; - } - } +// for (size_t i = 0; i < nidx; i++) { +// pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i); - // Loop to write - while ((pNode = tdListPopHead(pMem->actList)) != NULL) { - pAct = (SActObj *)pNode->data; - if (pAct->act == TSDB_UPDATE_META) { - pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj)); - if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len, false) < 0) { - tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, - tstrerror(terrno)); - tsdbCloseMFile(&mf); - (void)tsdbApplyMFileChange(&mf, pOMFile); - // TODO: need to reload metaCache - return -1; - } - } else if (pAct->act == TSDB_DROP_META) { - if (tsdbDropMetaRecord(pfs, &mf, pAct->uid) < 0) { - tsdbError("vgId:%d failed to drop META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, - tstrerror(terrno)); - tsdbCloseMFile(&mf); - tsdbApplyMFileChange(&mf, pOMFile); - // TODO: need to reload metaCache - return -1; - } - } else { - ASSERT(false); - } - } +// size = tsdbEncodeSBlockIdx(NULL, pBlkIdx); +// if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1; - if (tsdbUpdateMFileHeader(&mf) < 0) { - tsdbError("vgId:%d failed to update META file header since %s, revert it", REPO_ID(pRepo), tstrerror(terrno)); - tsdbApplyMFileChange(&mf, pOMFile); - // TODO: need to reload metaCache - return -1; - } +// void *ptr = POINTER_SHIFT(*ppBuf, tlen); +// tsdbEncodeSBlockIdx(&ptr, pBlkIdx); - TSDB_FILE_FSYNC(&mf); - tsdbCloseMFile(&mf); - tsdbUpdateMFile(pfs, &mf); +// tlen += size; +// } - if (tsTsdbMetaCompactRatio > 0 && tsdbCompactMetaFile(pRepo, pfs, &mf) < 0) { - tsdbError("compact meta file error"); - } +// tlen += sizeof(TSCKSUM); +// if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; +// taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen); - return 0; -} +// if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) { +// return -1; +// } -int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord) { - int tlen = 0; - tlen += taosEncodeFixedU64(buf, pRecord->uid); - tlen += taosEncodeFixedI64(buf, pRecord->offset); - tlen += taosEncodeFixedI64(buf, pRecord->size); +// tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM))); +// pHeadf->info.offset = (uint32_t)offset; +// pHeadf->info.len = tlen; - return tlen; -} +// return 0; +// } -void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord) { - buf = taosDecodeFixedU64(buf, &(pRecord->uid)); - buf = taosDecodeFixedI64(buf, &(pRecord->offset)); - buf = taosDecodeFixedI64(buf, &(pRecord->size)); +// // =================== Commit Meta Data +// static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile* pMf, bool open) { +// STsdbFS * pfs = REPO_FS(pRepo); +// SMFile * pOMFile = pfs->cstatus->pmf; +// SDiskID did; + +// // Create/Open a meta file or open the existing file +// if (pOMFile == NULL) { +// // Create a new meta file +// did.level = TFS_PRIMARY_LEVEL; +// did.id = TFS_PRIMARY_ID; +// tsdbInitMFile(pMf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); + +// if (open && tsdbCreateMFile(pMf, true) < 0) { +// tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); +// return -1; +// } - return buf; -} +// tsdbInfo("vgId:%d meta file %s is created to commit", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMf)); +// } else { +// tsdbInitMFileEx(pMf, pOMFile); +// if (open && tsdbOpenMFile(pMf, O_WRONLY) < 0) { +// tsdbError("vgId:%d failed to open META file since %s", REPO_ID(pRepo), tstrerror(terrno)); +// return -1; +// } +// } -static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact) { - char buf[64] = "\0"; - void * pBuf = buf; - SKVRecord rInfo; - int64_t offset; +// return 0; +// } - // Seek to end of meta file - offset = tsdbSeekMFile(pMFile, 0, SEEK_END); - if (offset < 0) { - return -1; - } +// static int tsdbCommitMeta(STsdbRepo *pRepo) { +// STsdbFS * pfs = REPO_FS(pRepo); +// SMemTable *pMem = pRepo->imem; +// SMFile * pOMFile = pfs->cstatus->pmf; +// SMFile mf; +// SActObj * pAct = NULL; +// SActCont * pCont = NULL; +// SListNode *pNode = NULL; + +// ASSERT(pOMFile != NULL || listNEles(pMem->actList) > 0); + +// if (listNEles(pMem->actList) <= 0) { +// // no meta data to commit, just keep the old meta file +// tsdbUpdateMFile(pfs, pOMFile); +// if (tsTsdbMetaCompactRatio > 0) { +// if (tsdbInitCommitMetaFile(pRepo, &mf, false) < 0) { +// return -1; +// } +// int ret = tsdbCompactMetaFile(pRepo, pfs, &mf); +// if (ret < 0) tsdbError("compact meta file error"); + +// return ret; +// } +// return 0; +// } else { +// if (tsdbInitCommitMetaFile(pRepo, &mf, true) < 0) { +// return -1; +// } +// } - rInfo.offset = offset; - rInfo.uid = uid; - rInfo.size = contLen; +// // Loop to write +// while ((pNode = tdListPopHead(pMem->actList)) != NULL) { +// pAct = (SActObj *)pNode->data; +// if (pAct->act == TSDB_UPDATE_META) { +// pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj)); +// if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len, false) < 0) { +// tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, +// tstrerror(terrno)); +// tsdbCloseMFile(&mf); +// (void)tsdbApplyMFileChange(&mf, pOMFile); +// // TODO: need to reload metaCache +// return -1; +// } +// } else if (pAct->act == TSDB_DROP_META) { +// if (tsdbDropMetaRecord(pfs, &mf, pAct->uid) < 0) { +// tsdbError("vgId:%d failed to drop META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, +// tstrerror(terrno)); +// tsdbCloseMFile(&mf); +// tsdbApplyMFileChange(&mf, pOMFile); +// // TODO: need to reload metaCache +// return -1; +// } +// } else { +// ASSERT(false); +// } +// } - int tlen = tsdbEncodeKVRecord((void **)(&pBuf), &rInfo); - if (tsdbAppendMFile(pMFile, buf, tlen, NULL) < tlen) { - return -1; - } +// if (tsdbUpdateMFileHeader(&mf) < 0) { +// tsdbError("vgId:%d failed to update META file header since %s, revert it", REPO_ID(pRepo), tstrerror(terrno)); +// tsdbApplyMFileChange(&mf, pOMFile); +// // TODO: need to reload metaCache +// return -1; +// } - if (tsdbAppendMFile(pMFile, cont, contLen, NULL) < contLen) { - return -1; - } +// TSDB_FILE_FSYNC(&mf); +// tsdbCloseMFile(&mf); +// tsdbUpdateMFile(pfs, &mf); - tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM))); +// if (tsTsdbMetaCompactRatio > 0 && tsdbCompactMetaFile(pRepo, pfs, &mf) < 0) { +// tsdbError("compact meta file error"); +// } - SHashObj* cache = compact ? pfs->metaCacheComp : pfs->metaCache; +// return 0; +// } - pMFile->info.nRecords++; +// int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord) { +// int tlen = 0; +// tlen += taosEncodeFixedU64(buf, pRecord->uid); +// tlen += taosEncodeFixedI64(buf, pRecord->offset); +// tlen += taosEncodeFixedI64(buf, pRecord->size); - SKVRecord *pRecord = taosHashGet(cache, (void *)&uid, sizeof(uid)); - if (pRecord != NULL) { - pMFile->info.tombSize += (pRecord->size + sizeof(SKVRecord)); - } else { - pMFile->info.nRecords++; - } - taosHashPut(cache, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo)); +// return tlen; +// } - return 0; -} +// void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord) { +// buf = taosDecodeFixedU64(buf, &(pRecord->uid)); +// buf = taosDecodeFixedI64(buf, &(pRecord->offset)); +// buf = taosDecodeFixedI64(buf, &(pRecord->size)); -static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) { - SKVRecord rInfo = {0}; - char buf[128] = "\0"; +// return buf; +// } - SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)(&uid), sizeof(uid)); - if (pRecord == NULL) { - tsdbError("failed to drop META record with key %" PRIu64 " since not find", uid); - return -1; - } +// static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact) { +// char buf[64] = "\0"; +// void * pBuf = buf; +// SKVRecord rInfo; +// int64_t offset; - rInfo.offset = -pRecord->offset; - rInfo.uid = pRecord->uid; - rInfo.size = pRecord->size; +// // Seek to end of meta file +// offset = tsdbSeekMFile(pMFile, 0, SEEK_END); +// if (offset < 0) { +// return -1; +// } - void *pBuf = buf; - tsdbEncodeKVRecord(&pBuf, &rInfo); +// rInfo.offset = offset; +// rInfo.uid = uid; +// rInfo.size = contLen; - if (tsdbAppendMFile(pMFile, buf, sizeof(SKVRecord), NULL) < 0) { - return -1; - } +// int tlen = tsdbEncodeKVRecord((void **)(&pBuf), &rInfo); +// if (tsdbAppendMFile(pMFile, buf, tlen, NULL) < tlen) { +// return -1; +// } - pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t *)buf, sizeof(SKVRecord)); - pMFile->info.nDels++; - pMFile->info.nRecords--; - pMFile->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); +// if (tsdbAppendMFile(pMFile, cont, contLen, NULL) < contLen) { +// return -1; +// } - taosHashRemove(pfs->metaCache, (void *)(&uid), sizeof(uid)); - return 0; -} +// tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM))); -static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) { - float delPercent = (float)(pMFile->info.nDels) / (float)(pMFile->info.nRecords); - float tombPercent = (float)(pMFile->info.tombSize) / (float)(pMFile->info.size); - float compactRatio = (float)(tsTsdbMetaCompactRatio)/100; +// SHashObj* cache = compact ? pfs->metaCacheComp : pfs->metaCache; - if (delPercent < compactRatio && tombPercent < compactRatio) { - return 0; - } +// pMFile->info.nRecords++; - if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) { - tsdbError("open meta file %s compact fail", pMFile->f.rname); - return -1; - } +// SKVRecord *pRecord = taosHashGet(cache, (void *)&uid, sizeof(uid)); +// if (pRecord != NULL) { +// pMFile->info.tombSize += (pRecord->size + sizeof(SKVRecord)); +// } else { +// pMFile->info.nRecords++; +// } +// taosHashPut(cache, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo)); - tsdbInfo("begin compact tsdb meta file, ratio:%d, nDels:%" PRId64 ",nRecords:%" PRId64 ",tombSize:%" PRId64 ",size:%" PRId64, - tsTsdbMetaCompactRatio, pMFile->info.nDels,pMFile->info.nRecords,pMFile->info.tombSize,pMFile->info.size); +// return 0; +// } - SMFile mf; - SDiskID did; +// static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) { +// SKVRecord rInfo = {0}; +// char buf[128] = "\0"; - // first create tmp meta file - did.level = TFS_PRIMARY_LEVEL; - did.id = TFS_PRIMARY_ID; - tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)) + 1); +// SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)(&uid), sizeof(uid)); +// if (pRecord == NULL) { +// tsdbError("failed to drop META record with key %" PRIu64 " since not find", uid); +// return -1; +// } - if (tsdbCreateMFile(&mf, true) < 0) { - tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; - } +// rInfo.offset = -pRecord->offset; +// rInfo.uid = pRecord->uid; +// rInfo.size = pRecord->size; - tsdbInfo("vgId:%d meta file %s is created to compact meta data", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf)); +// void *pBuf = buf; +// tsdbEncodeKVRecord(&pBuf, &rInfo); - // second iterator metaCache - int code = -1; - int64_t maxBufSize = 1024; - SKVRecord *pRecord; - void *pBuf = NULL; +// if (tsdbAppendMFile(pMFile, buf, sizeof(SKVRecord), NULL) < 0) { +// return -1; +// } - pBuf = malloc((size_t)maxBufSize); - if (pBuf == NULL) { - goto _err; - } +// pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t *)buf, sizeof(SKVRecord)); +// pMFile->info.nDels++; +// pMFile->info.nRecords--; +// pMFile->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); - // init Comp - assert(pfs->metaCacheComp == NULL); - pfs->metaCacheComp = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - if (pfs->metaCacheComp == NULL) { - goto _err; - } +// taosHashRemove(pfs->metaCache, (void *)(&uid), sizeof(uid)); +// return 0; +// } - pRecord = taosHashIterate(pfs->metaCache, NULL); - while (pRecord) { - if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) { - tsdbError("vgId:%d failed to seek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), - tstrerror(terrno)); - goto _err; - } - if (pRecord->size > maxBufSize) { - maxBufSize = pRecord->size; - void* tmp = realloc(pBuf, (size_t)maxBufSize); - if (tmp == NULL) { - goto _err; - } - pBuf = tmp; - } - int nread = (int)tsdbReadMFile(pMFile, pBuf, pRecord->size); - if (nread < 0) { - tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), - tstrerror(terrno)); - goto _err; - } +// static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) { +// float delPercent = (float)(pMFile->info.nDels) / (float)(pMFile->info.nRecords); +// float tombPercent = (float)(pMFile->info.tombSize) / (float)(pMFile->info.size); +// float compactRatio = (float)(tsTsdbMetaCompactRatio)/100; - if (nread < pRecord->size) { - tsdbError("vgId:%d failed to read file %s since file corrupted, expected read:%" PRId64 " actual read:%d", - REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), pRecord->size, nread); - goto _err; - } +// if (delPercent < compactRatio && tombPercent < compactRatio) { +// return 0; +// } - if (tsdbUpdateMetaRecord(pfs, &mf, pRecord->uid, pBuf, (int)pRecord->size, true) < 0) { - tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pRecord->uid, - tstrerror(terrno)); - goto _err; - } +// if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) { +// tsdbError("open meta file %s compact fail", pMFile->f.rname); +// return -1; +// } - pRecord = taosHashIterate(pfs->metaCache, pRecord); - } - code = 0; - -_err: - if (code == 0) TSDB_FILE_FSYNC(&mf); - tsdbCloseMFile(&mf); - tsdbCloseMFile(pMFile); - - if (code == 0) { - // rename meta.tmp -> meta - tsdbInfo("vgId:%d meta file rename %s -> %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf), TSDB_FILE_FULL_NAME(pMFile)); - taosRename(mf.f.aname,pMFile->f.aname); - tstrncpy(mf.f.aname, pMFile->f.aname, TSDB_FILENAME_LEN); - tstrncpy(mf.f.rname, pMFile->f.rname, TSDB_FILENAME_LEN); - // update current meta file info - pfs->nstatus->pmf = NULL; - tsdbUpdateMFile(pfs, &mf); - - taosHashCleanup(pfs->metaCache); - pfs->metaCache = pfs->metaCacheComp; - pfs->metaCacheComp = NULL; - } else { - // remove meta.tmp file - remove(mf.f.aname); - taosHashCleanup(pfs->metaCacheComp); - pfs->metaCacheComp = NULL; - } +// tsdbInfo("begin compact tsdb meta file, ratio:%d, nDels:%" PRId64 ",nRecords:%" PRId64 ",tombSize:%" PRId64 +// ",size:%" PRId64, +// tsTsdbMetaCompactRatio, pMFile->info.nDels,pMFile->info.nRecords,pMFile->info.tombSize,pMFile->info.size); - tfree(pBuf); +// SMFile mf; +// SDiskID did; - ASSERT(mf.info.nDels == 0); - ASSERT(mf.info.tombSize == 0); +// // first create tmp meta file +// did.level = TFS_PRIMARY_LEVEL; +// did.id = TFS_PRIMARY_ID; +// tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)) + 1); - tsdbInfo("end compact tsdb meta file,code:%d,nRecords:%" PRId64 ",size:%" PRId64, - code,mf.info.nRecords,mf.info.size); - return code; -} +// if (tsdbCreateMFile(&mf, true) < 0) { +// tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); +// return -1; +// } -// =================== Commit Time-Series Data -#if 0 -static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { - for (int i = 0; i < nIters; i++) { - TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter); - if (nextKey != TSDB_DATA_TIMESTAMP_NULL && (nextKey >= minKey && nextKey <= maxKey)) return true; - } - return false; -} -#endif +// tsdbInfo("vgId:%d meta file %s is created to compact meta data", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf)); +// // second iterator metaCache +// int code = -1; +// int64_t maxBufSize = 1024; +// SKVRecord *pRecord; +// void *pBuf = NULL; +// pBuf = malloc((size_t)maxBufSize); +// if (pBuf == NULL) { +// goto _err; +// } +// // init Comp +// assert(pfs->metaCacheComp == NULL); +// pfs->metaCacheComp = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); +// if (pfs->metaCacheComp == NULL) { +// goto _err; +// } +// pRecord = taosHashIterate(pfs->metaCache, NULL); +// while (pRecord) { +// if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) { +// tsdbError("vgId:%d failed to seek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), +// tstrerror(terrno)); +// goto _err; +// } +// if (pRecord->size > maxBufSize) { +// maxBufSize = pRecord->size; +// void* tmp = realloc(pBuf, (size_t)maxBufSize); +// if (tmp == NULL) { +// goto _err; +// } +// pBuf = tmp; +// } +// int nread = (int)tsdbReadMFile(pMFile, pBuf, pRecord->size); +// if (nread < 0) { +// tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), +// tstrerror(terrno)); +// goto _err; +// } + +// if (nread < pRecord->size) { +// tsdbError("vgId:%d failed to read file %s since file corrupted, expected read:%" PRId64 " actual read:%d", +// REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), pRecord->size, nread); +// goto _err; +// } + +// if (tsdbUpdateMetaRecord(pfs, &mf, pRecord->uid, pBuf, (int)pRecord->size, true) < 0) { +// tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pRecord->uid, +// tstrerror(terrno)); +// goto _err; +// } + +// pRecord = taosHashIterate(pfs->metaCache, pRecord); +// } +// code = 0; + +// _err: +// if (code == 0) TSDB_FILE_FSYNC(&mf); +// tsdbCloseMFile(&mf); +// tsdbCloseMFile(pMFile); + +// if (code == 0) { +// // rename meta.tmp -> meta +// tsdbInfo("vgId:%d meta file rename %s -> %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf), +// TSDB_FILE_FULL_NAME(pMFile)); taosRename(mf.f.aname,pMFile->f.aname); tstrncpy(mf.f.aname, pMFile->f.aname, +// TSDB_FILENAME_LEN); tstrncpy(mf.f.rname, pMFile->f.rname, TSDB_FILENAME_LEN); +// // update current meta file info +// pfs->nstatus->pmf = NULL; +// tsdbUpdateMFile(pfs, &mf); + +// taosHashCleanup(pfs->metaCache); +// pfs->metaCache = pfs->metaCacheComp; +// pfs->metaCacheComp = NULL; +// } else { +// // remove meta.tmp file +// remove(mf.f.aname); +// taosHashCleanup(pfs->metaCacheComp); +// pfs->metaCacheComp = NULL; +// } + +// tfree(pBuf); + +// ASSERT(mf.info.nDels == 0); +// ASSERT(mf.info.tombSize == 0); + +// tsdbInfo("end compact tsdb meta file,code:%d,nRecords:%" PRId64 ",size:%" PRId64, +// code,mf.info.nRecords,mf.info.size); +// return code; +// } + +// // =================== Commit Time-Series Data +// #if 0 +// static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { +// for (int i = 0; i < nIters; i++) { +// TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter); +// if (nextKey != TSDB_DATA_TIMESTAMP_NULL && (nextKey >= minKey && nextKey <= maxKey)) return true; +// } +// return false; +// } +// #endif static int tsdbCommitToTable(SCommitH *pCommith, int tid) { SCommitIter *pIter = pCommith->iters + tid; @@ -896,17 +1016,13 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { tsdbResetCommitTable(pCommith); - TSDB_RLOCK_TABLE(pIter->pTable); - // Set commit table if (tsdbSetCommitTable(pCommith, pIter->pTable) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); return -1; } // No disk data and no memory data, just return if (pCommith->readh.pBlkIdx == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) { - TSDB_RUNLOCK_TABLE(pIter->pTable); return 0; } @@ -917,7 +1033,6 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { if (pCommith->readh.pBlkIdx) { if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); return -1; } @@ -938,7 +1053,6 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) || (pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) { if (tsdbMoveBlock(pCommith, bidx) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); return -1; } @@ -949,43 +1063,41 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { pBlock = NULL; } } else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) { - // merge pBlock data and memory data - if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return -1; - } - - bidx++; - if (bidx < nBlocks) { - pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - } else { - pBlock = NULL; - } - nextKey = tsdbNextIterKey(pIter->pIter); + // // merge pBlock data and memory data + // if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) { + // TSDB_RUNLOCK_TABLE(pIter->pTable); + // return -1; + // } + + // bidx++; + // if (bidx < nBlocks) { + // pBlock = pCommith->readh.pBlkInfo->blocks + bidx; + // } else { + // pBlock = NULL; + // } + // nextKey = tsdbNextIterKey(pIter->pIter); } else { - // Only commit memory data - if (pBlock == NULL) { - if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return -1; - } - } else { - if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return -1; - } - } - nextKey = tsdbNextIterKey(pIter->pIter); + // // Only commit memory data + // if (pBlock == NULL) { + // if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) { + // TSDB_RUNLOCK_TABLE(pIter->pTable); + // return -1; + // } + // } else { + // if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) { + // TSDB_RUNLOCK_TABLE(pIter->pTable); + // return -1; + // } + // } + // nextKey = tsdbNextIterKey(pIter->pIter); } } - TSDB_RUNLOCK_TABLE(pIter->pTable); - - if (tsdbWriteBlockInfo(pCommith) < 0) { - tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), - TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); - return -1; - } + // if (tsdbWriteBlockInfo(pCommith) < 0) { + // tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), + // TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); + // return -1; + // } return 0; } @@ -1023,8 +1135,8 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { } } -int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, - bool isLast, bool isSuper, void **ppBuf, void **ppCBuf) { +int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, + bool isSuper, void **ppBuf, void **ppCBuf) { STsdbCfg * pCfg = REPO_CFG(pRepo); SBlockData *pBlockData; int64_t offset = 0; @@ -1090,8 +1202,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo pBlockCol = pBlockData->cols + tcol; tptr = POINTER_SHIFT(pBlockData, lsize); - if (pCfg->compression == TWO_STAGE_COMP && - tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) { + if (pCfg->compression == TWO_STAGE_COMP && tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) { return -1; } @@ -1162,132 +1273,133 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith)))); } +// static int tsdbWriteBlockInfo(SCommitH *pCommih) { +// SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); +// SBlockIdx blkIdx; +// STable * pTable = TSDB_COMMIT_TABLE(pCommih); -static int tsdbWriteBlockInfo(SCommitH *pCommih) { - SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); - SBlockIdx blkIdx; - STable * pTable = TSDB_COMMIT_TABLE(pCommih); - - if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))), - &blkIdx) < 0) { - return -1; - } - - if (blkIdx.numOfBlocks == 0) { - return 0; - } - - if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - return 0; -} +// if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void +// **)(&(TSDB_COMMIT_BUF(pCommih))), +// &blkIdx) < 0) { +// return -1; +// } -static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) { - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbCfg * pCfg = REPO_CFG(pRepo); - SMergeInfo mInfo; - int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); - SDFile * pDFile; - bool isLast; - SBlock block; +// if (blkIdx.numOfBlocks == 0) { +// return 0; +// } - while (true) { - tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, defaultRows, pCommith->pDataCols, NULL, 0, - pCfg->update, &mInfo); +// if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) { +// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; +// return -1; +// } - if (pCommith->pDataCols->numOfRows <= 0) break; +// return 0; +// } - if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { - pDFile = TSDB_COMMIT_DATA_FILE(pCommith); - isLast = false; - } else { - pDFile = TSDB_COMMIT_LAST_FILE(pCommith); - isLast = true; - } +// static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) { +// STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); +// STsdbCfg * pCfg = REPO_CFG(pRepo); +// SMergeInfo mInfo; +// int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); +// SDFile * pDFile; +// bool isLast; +// SBlock block; + +// while (true) { +// tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, defaultRows, pCommith->pDataCols, NULL, 0, +// pCfg->update, &mInfo); + +// if (pCommith->pDataCols->numOfRows <= 0) break; + +// if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { +// pDFile = TSDB_COMMIT_DATA_FILE(pCommith); +// isLast = false; +// } else { +// pDFile = TSDB_COMMIT_LAST_FILE(pCommith); +// isLast = true; +// } - if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; +// if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; - if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) { - return -1; - } - } +// if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) { +// return -1; +// } +// } - return 0; -} +// return 0; +// } -static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbCfg * pCfg = REPO_CFG(pRepo); - int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; - SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - TSKEY keyLimit; - int16_t colId = 0; - SMergeInfo mInfo; - SBlock subBlocks[TSDB_MAX_SUBBLOCKS]; - SBlock block, supBlock; - SDFile * pDFile; - - if (bidx == nBlocks - 1) { - keyLimit = pCommith->maxKey; - } else { - keyLimit = pBlock[1].keyFirst - 1; - } +// static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { +// STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); +// STsdbCfg * pCfg = REPO_CFG(pRepo); +// int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; +// SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx; +// TSKEY keyLimit; +// int16_t colId = 0; +// SMergeInfo mInfo; +// SBlock subBlocks[TSDB_MAX_SUBBLOCKS]; +// SBlock block, supBlock; +// SDFile * pDFile; + +// if (bidx == nBlocks - 1) { +// keyLimit = pCommith->maxKey; +// } else { +// keyLimit = pBlock[1].keyFirst - 1; +// } - SSkipListIterator titer = *(pIter->pIter); - if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1) < 0) return -1; +// SSkipListIterator titer = *(pIter->pIter); +// if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1) < 0) return -1; - tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, pCommith->readh.pDCols[0]->cols[0].pData, - pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo); +// tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, pCommith->readh.pDCols[0]->cols[0].pData, +// pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo); - if (mInfo.nOperations == 0) { - // no new data to insert (all updates denied) - if (tsdbMoveBlock(pCommith, bidx) < 0) { - return -1; - } - *(pIter->pIter) = titer; - } else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) { - // Ignore the block - ASSERT(0); - *(pIter->pIter) = titer; - } else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) { - // Add a sub-block - tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols, - pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update, - &mInfo); - if (pBlock->last) { - pDFile = TSDB_COMMIT_LAST_FILE(pCommith); - } else { - pDFile = TSDB_COMMIT_DATA_FILE(pCommith); - } +// if (mInfo.nOperations == 0) { +// // no new data to insert (all updates denied) +// if (tsdbMoveBlock(pCommith, bidx) < 0) { +// return -1; +// } +// *(pIter->pIter) = titer; +// } else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) { +// // Ignore the block +// ASSERT(0); +// *(pIter->pIter) = titer; +// } else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) { +// // Add a sub-block +// tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols, +// pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, +// pCfg->update, &mInfo); +// if (pBlock->last) { +// pDFile = TSDB_COMMIT_LAST_FILE(pCommith); +// } else { +// pDFile = TSDB_COMMIT_DATA_FILE(pCommith); +// } - if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1; +// if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1; - if (pBlock->numOfSubBlocks == 1) { - subBlocks[0] = *pBlock; - subBlocks[0].numOfSubBlocks = 0; - } else { - memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), - sizeof(SBlock) * pBlock->numOfSubBlocks); - } - subBlocks[pBlock->numOfSubBlocks] = block; - supBlock = *pBlock; - supBlock.keyFirst = mInfo.keyFirst; - supBlock.keyLast = mInfo.keyLast; - supBlock.numOfSubBlocks++; - supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed; - supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock); - - if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1; - } else { - if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; - if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return -1; - } +// if (pBlock->numOfSubBlocks == 1) { +// subBlocks[0] = *pBlock; +// subBlocks[0].numOfSubBlocks = 0; +// } else { +// memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), +// sizeof(SBlock) * pBlock->numOfSubBlocks); +// } +// subBlocks[pBlock->numOfSubBlocks] = block; +// supBlock = *pBlock; +// supBlock.keyFirst = mInfo.keyFirst; +// supBlock.keyLast = mInfo.keyLast; +// supBlock.numOfSubBlocks++; +// supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed; +// supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock); + +// if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1; +// } else { +// if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; +// if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return +// -1; +// } - return 0; -} +// return 0; +// } static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; @@ -1342,113 +1454,107 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const return 0; } -static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool isLastOneBlock) { - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbCfg * pCfg = REPO_CFG(pRepo); - SBlock block; - SDFile * pDFile; - bool isLast; - int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); - - int biter = 0; - while (true) { - tsdbLoadAndMergeFromCache(pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows, - pCfg->update); - - if (pCommith->pDataCols->numOfRows == 0) break; - - if (isLastOneBlock) { - if (pCommith->pDataCols->numOfRows < pCfg->minRowsPerFileBlock) { - pDFile = TSDB_COMMIT_LAST_FILE(pCommith); - isLast = true; - } else { - pDFile = TSDB_COMMIT_DATA_FILE(pCommith); - isLast = false; - } - } else { - pDFile = TSDB_COMMIT_DATA_FILE(pCommith); - isLast = false; - } - - if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; - if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; - } - - return 0; -} - -static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, - TSKEY maxKey, int maxRows, int8_t update) { - TSKEY key1 = INT64_MAX; - TSKEY key2 = INT64_MAX; - STSchema *pSchema = NULL; - - ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey); - tdResetDataCols(pTarget); - - while (true) { - key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); - SMemRow row = tsdbNextIterRow(pCommitIter->pIter); - if (row == NULL || memRowKey(row) > maxKey) { - key2 = INT64_MAX; - } else { - key2 = memRowKey(row); - } - - if (key1 == INT64_MAX && key2 == INT64_MAX) break; - - if (key1 < key2) { - for (int i = 0; i < pDataCols->numOfCols; i++) { - //TODO: dataColAppendVal may fail - dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, - pTarget->maxPoints); - } - - pTarget->numOfRows++; - (*iter)++; - } else if (key1 > key2) { - if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); - ASSERT(pSchema != NULL); - } - - tdAppendMemRowToDataCol(row, pSchema, pTarget, true); +// static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool +// isLastOneBlock) { +// STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); +// STsdbCfg * pCfg = REPO_CFG(pRepo); +// SBlock block; +// SDFile * pDFile; +// bool isLast; +// int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); + +// int biter = 0; +// while (true) { +// tsdbLoadAndMergeFromCache(pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows, +// pCfg->update); + +// if (pCommith->pDataCols->numOfRows == 0) break; + +// if (isLastOneBlock) { +// if (pCommith->pDataCols->numOfRows < pCfg->minRowsPerFileBlock) { +// pDFile = TSDB_COMMIT_LAST_FILE(pCommith); +// isLast = true; +// } else { +// pDFile = TSDB_COMMIT_DATA_FILE(pCommith); +// isLast = false; +// } +// } else { +// pDFile = TSDB_COMMIT_DATA_FILE(pCommith); +// isLast = false; +// } - tSkipListIterNext(pCommitIter->pIter); - } else { - if (update != TD_ROW_OVERWRITE_UPDATE) { - //copy disk data - for (int i = 0; i < pDataCols->numOfCols; i++) { - //TODO: dataColAppendVal may fail - dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, - pTarget->maxPoints); - } +// if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; +// if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; +// } - if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++; - } - if (update != TD_ROW_DISCARD_UPDATE) { - //copy mem data - if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); - ASSERT(pSchema != NULL); - } +// return 0; +// } - tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); - } - (*iter)++; - tSkipListIterNext(pCommitIter->pIter); - } +// static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, +// TSKEY maxKey, int maxRows, int8_t update) { +// TSKEY key1 = INT64_MAX; +// TSKEY key2 = INT64_MAX; +// STSchema *pSchema = NULL; + +// ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey); +// tdResetDataCols(pTarget); + +// while (true) { +// key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); +// SMemRow row = tsdbNextIterRow(pCommitIter->pIter); +// if (row == NULL || memRowKey(row) > maxKey) { +// key2 = INT64_MAX; +// } else { +// key2 = memRowKey(row); +// } - if (pTarget->numOfRows >= maxRows) break; - } -} +// if (key1 == INT64_MAX && key2 == INT64_MAX) break; + +// if (key1 < key2) { +// for (int i = 0; i < pDataCols->numOfCols; i++) { +// //TODO: dataColAppendVal may fail +// dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, +// pTarget->maxPoints); +// } + +// pTarget->numOfRows++; +// (*iter)++; +// } else if (key1 > key2) { +// if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { +// pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); +// ASSERT(pSchema != NULL); +// } + +// tdAppendMemRowToDataCol(row, pSchema, pTarget, true); + +// tSkipListIterNext(pCommitIter->pIter); +// } else { +// if (update != TD_ROW_OVERWRITE_UPDATE) { +// //copy disk data +// for (int i = 0; i < pDataCols->numOfCols; i++) { +// //TODO: dataColAppendVal may fail +// dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, +// pTarget->maxPoints); +// } + +// if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++; +// } +// if (update != TD_ROW_DISCARD_UPDATE) { +// //copy mem data +// if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { +// pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); +// ASSERT(pSchema != NULL); +// } + +// tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); +// } +// (*iter)++; +// tSkipListIterNext(pCommitIter->pIter); +// } -static void tsdbResetCommitFile(SCommitH *pCommith) { - pCommith->isRFileSet = false; - pCommith->isDFileSame = false; - pCommith->isLFileSame = false; - taosArrayClear(pCommith->aBlkIdx); -} +// if (pTarget->numOfRows >= maxRows) break; +// } +// } static void tsdbResetCommitTable(SCommitH *pCommith) { taosArrayClear(pCommith->aSubBlk); @@ -1456,131 +1562,6 @@ static void tsdbResetCommitTable(SCommitH *pCommith) { pCommith->pTable = NULL; } -static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { - SDiskID did; - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); - SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith); - - tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &(did.level), &(did.id)); - if (did.level == TFS_UNDECIDED_LEVEL) { - terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; - return -1; - } - - // Open read FSET - if (pSet) { - if (tsdbSetAndOpenReadFSet(&(pCommith->readh), pSet) < 0) { - return -1; - } - - pCommith->isRFileSet = true; - - if (tsdbLoadBlockIdx(&(pCommith->readh)) < 0) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - return -1; - } - - tsdbDebug("vgId:%d FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo), TSDB_FSET_FID(pSet), - TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); - } else { - pCommith->isRFileSet = false; - } - - // Set and open commit FSET - if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) { - // Create a new FSET to write data - tsdbInitDFileSet(pWSet, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo))); - - if (tsdbCreateDFileSet(pWSet, true) < 0) { - tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo), - TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet), tstrerror(terrno)); - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - } - return -1; - } - - pCommith->isDFileSame = false; - pCommith->isLFileSame = false; - - tsdbDebug("vgId:%d FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet), - TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet)); - } else { - did.level = TSDB_FSET_LEVEL(pSet); - did.id = TSDB_FSET_ID(pSet); - - pCommith->wSet.fid = fid; - pCommith->wSet.state = 0; - - // TSDB_FILE_HEAD - SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith); - tsdbInitDFile(pWHeadf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD); - if (tsdbCreateDFile(pWHeadf, true) < 0) { - tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf), - tstrerror(terrno)); - - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - return -1; - } - } - - // TSDB_FILE_DATA - SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh)); - SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith); - tsdbInitDFileEx(pWDataf, pRDataf); - if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) { - tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWDataf), - tstrerror(terrno)); - - tsdbCloseDFileSet(pWSet); - tsdbRemoveDFile(pWHeadf); - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - return -1; - } - } - pCommith->isDFileSame = true; - - // TSDB_FILE_LAST - SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh)); - SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith); - if (pRLastf->info.size < 32 * 1024) { - tsdbInitDFileEx(pWLastf, pRLastf); - pCommith->isLFileSame = true; - - if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) { - tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf), - tstrerror(terrno)); - - tsdbCloseDFileSet(pWSet); - tsdbRemoveDFile(pWHeadf); - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - return -1; - } - } - } else { - tsdbInitDFile(pWLastf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST); - pCommith->isLFileSame = false; - - if (tsdbCreateDFile(pWLastf, true) < 0) { - tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf), - tstrerror(terrno)); - - tsdbCloseDFileSet(pWSet); - (void)tsdbRemoveDFile(pWHeadf); - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - return -1; - } - } - } - } - - return 0; -} - static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) { if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); @@ -1592,46 +1573,45 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) { tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); } -static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) { - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbCfg * pCfg = REPO_CFG(pRepo); - int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed; +// static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) { +// STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); +// STsdbCfg * pCfg = REPO_CFG(pRepo); +// int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed; - ASSERT(mergeRows > 0); +// ASSERT(mergeRows > 0); - if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRowsPerFileBlock) { - if (pBlock->last) { - if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true; - } else { - if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true; - } - } - - return false; -} - -int tsdbApplyRtn(STsdbRepo *pRepo) { - SRtn rtn; - SFSIter fsiter; - STsdbFS * pfs = REPO_FS(pRepo); - SDFileSet *pSet; +// if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRowsPerFileBlock) { +// if (pBlock->last) { +// if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true; +// } else { +// if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true; +// } +// } - // Get retention snapshot - tsdbGetRtnSnap(pRepo, &rtn); +// return false; +// } - tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD); - while ((pSet = tsdbFSIterNext(&fsiter))) { - if (pSet->fid < rtn.minFid) { - tsdbInfo("vgId:%d FSET %d at level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid, - TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); - continue; - } +// int tsdbApplyRtn(STsdbRepo *pRepo) { +// SRtn rtn; +// SFSIter fsiter; +// STsdbFS * pfs = REPO_FS(pRepo); +// SDFileSet *pSet; + +// // Get retention snapshot +// tsdbGetRtnSnap(pRepo, &rtn); + +// tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD); +// while ((pSet = tsdbFSIterNext(&fsiter))) { +// if (pSet->fid < rtn.minFid) { +// tsdbInfo("vgId:%d FSET %d at level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid, +// TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); +// continue; +// } - if (tsdbApplyRtnOnFSet(pRepo, pSet, &rtn) < 0) { - return -1; - } - } +// if (tsdbApplyRtnOnFSet(pRepo, pSet, &rtn) < 0) { +// return -1; +// } +// } - return 0; -} -#endif \ No newline at end of file +// return 0; +// } \ No newline at end of file -- GitLab