diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 2e69640a065f9049f2bcb2a2c3f190f0178fca3f..c327b64fdba070362982946dd384718bcfd1958a 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -35,6 +35,7 @@ typedef struct SColVal SColVal; typedef struct STSRow2 STSRow2; typedef struct STSRowBuilder STSRowBuilder; typedef struct SColData SColData; +typedef struct SColDataBatch SColDataBatch; typedef struct STagVal STagVal; typedef struct STag STag; diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 5ab1dffaaa25585572aca9577ecd4f00f0c466b2..b98568d75bcfc228929563a1a85c0dced9ea6565 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -128,7 +128,7 @@ typedef struct SDFileSetWriter SDFileSetWriter; // SDFileSetWriter int32_t tsdbDFileSetWriterOpen(SDFileSetWriter *pWriter, STsdb *pTsdb, SDFileSet *pSet); -int32_t tsdbDFileSetWriterClose(SDFileSetWriter *pWriter); +int32_t tsdbDFileSetWriterClose(SDFileSetWriter *pWriter, int8_t sync); int32_t tsdbWriteBlockData(SDFileSetWriter *pWriter, SDataCols *pDataCols, SBlock *pBlock); int32_t tsdbWriteSBlockInfo(SDFileSetWriter *pWriter, SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx); int32_t tsdbWriteSBlockIdx(SDFileSetWriter *pWriter, SBlockIdx *pBlockIdx); @@ -145,6 +145,8 @@ int32_t tsdbLoadSBlockStatis(SDFileSetReader *pReader, SBlock *pBlock, SBlockSta // SDelFReader // tsdbUtil.c ============================================================================================== +int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision); + int32_t tsdbRealloc(uint8_t **ppBuf, int64_t size); void tsdbFree(uint8_t *pBuf); @@ -282,6 +284,8 @@ struct SBlockIdxItem { }; struct SBlockIdx { + int64_t suid; + int64_t uid; uint32_t delimiter; SOffset offset; uint32_t nData; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index e210a47868ac9b00626728852f63e0868c613c1a..7e5cd6cf95c3e28c7bda372ca4d2e8d74f63b96f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -34,10 +34,12 @@ struct SCommitter { TSKEY maxKey; SDFileSetReader *pReader; SDFileSetWriter *pWriter; - SArray *aOBlockIdx; - SArray *aBlockIdx; + SMapData oBlockIdx; + SMapData nBlockIdx; // commit table data SBlockIdx *pBlockIdx; + SMapData oBlock; + SMapData nBlock; /* commit del */ SDelFReader *pDelFReader; SDelFWriter *pDelFWriter; @@ -111,7 +113,6 @@ _err: return code; } -// ===================================== NEW ====================================== static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { int32_t code = 0; @@ -137,7 +138,6 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) { // no data, just return if (pMemTable->nRow == 0) { - tsdbDebug("vgId:%d no data to commit", TD_VID(pTsdb->pVnode)); goto _exit; } @@ -160,11 +160,11 @@ static int32_t tsdbCommitData(SCommitter *pCommitter) { } _exit: - tsdbDebug("vgId:%d commit TSDB data, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow); + tsdbDebug("vgId:%d commit data done, nRow:%" PRId64, TD_VID(pTsdb->pVnode), pMemTable->nRow); return code; _err: - tsdbError("vgId:%d failed to commit TSDB data since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d commit data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } @@ -368,14 +368,10 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter); static int32_t tsdbCommitDataImpl(SCommitter *pCommitter) { int32_t code = 0; - for (;;) { - if (pCommitter->nextCommitKey == TSKEY_MAX) break; - - pCommitter->commitFid = TSDB_KEY_FID(pCommitter->nextCommitKey, pCommitter->minutes, pCommitter->precision); + while (pCommitter->nextCommitKey < TSKEY_MAX) { + pCommitter->commitFid = tsdbKeyFid(pCommitter->nextCommitKey, pCommitter->minutes, pCommitter->precision); code = tsdbCommitFileData(pCommitter); - if (code) { - goto _err; - } + if (code) goto _err; } _exit: @@ -425,36 +421,33 @@ _err: static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; - SDFileSet *pRSet = NULL; - SDFileSet *pWSet = NULL; - - taosArrayClear(pCommitter->aOBlockIdx); - taosArrayClear(pCommitter->aBlockIdx); - - // search pRSet (todo) + SDFileSet *pRSet = NULL; // TODO + SDFileSet *pWSet = NULL; // TODO - // open file to read + // load old + pCommitter->oBlockIdx.nItem = 0; + pCommitter->oBlockIdx.flag = 0; + pCommitter->oBlockIdx.nData = 0; if (pRSet) { - code = tsdbDFileSetReaderOpen(pCommitter->pReader, pTsdb, pRSet); + code = tsdbDFileSetReaderOpen(&pCommitter->pReader, pTsdb, pRSet); if (code) goto _err; - // create/open the pWSet (todo) - } else { - // create the pWSet (todo) + code = tsdbReadBlockIdx(pCommitter->pReader, &pCommitter->oBlockIdx); + if (code) goto _err; } - // open file for write - code = tsdbDFileSetWriterOpen(pCommitter->pWriter, pTsdb, pWSet); - if (code) goto _err; - - // read the SBlockIdx part for merge purpose - code = tsdbLoadSBlockIdx(pCommitter->pReader, pCommitter->aOBlockIdx); + // create new + pCommitter->nBlockIdx.nItem = 0; + pCommitter->nBlockIdx.flag = 0; + pCommitter->nBlockIdx.nData = 0; + code = tsdbDFileSetWriterOpen(&pCommitter->pWriter, pTsdb, pWSet); if (code) goto _err; _exit: return code; _err: + tsdbError("vgId:%d commit file data start failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); return code; } @@ -464,26 +457,29 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { int32_t code = 0; STsdb *pTsdb = pCommitter->pTsdb; SMemTable *pMemTable = pTsdb->imem; - int32_t iBlockIdx = 0; - int32_t nBlockIdx = taosArrayGetSize(pCommitter->aOBlockIdx); int32_t iTbData = 0; int32_t nTbData = taosArrayGetSize(pMemTable->aTbData); - SBlockIdx *pBlockIdx; + int32_t iBlockIdx = 0; + int32_t nBlockIdx = pCommitter->oBlockIdx.nItem; STbData *pTbData; + SBlockIdx *pBlockIdx; + SBlockIdx blockIdx; + int32_t c; while (iTbData < nTbData || iBlockIdx < nBlockIdx) { pTbData = NULL; pBlockIdx = NULL; - if (iTbData < nTbData) { pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData); } if (iBlockIdx < nBlockIdx) { - pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->aOBlockIdx, iBlockIdx); + tMapDataGetItemByIdx(&pCommitter->oBlockIdx, iBlockIdx, &blockIdx, NULL /* TODO */); + pBlockIdx = &blockIdx; } if (pTbData && pBlockIdx) { - int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx); + c = tTABLEIDCmprFn(pTbData, pBlockIdx); + if (c == 0) { iTbData++; iBlockIdx++; @@ -496,18 +492,24 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { } } else { if (pTbData) { - iTbData++; - } else { iBlockIdx++; } + if (pBlockIdx) { + iTbData++; + } + } + + if (pTbData && pTbData->sl.size == 0) { + pTbData = NULL; } + if (pTbData == NULL && pBlockIdx == NULL) continue; + pCommitter->pTbData = pTbData; pCommitter->pBlockIdx = pBlockIdx; + code = tsdbCommitTableData(pCommitter); - if (code) { - goto _err; - } + if (code) goto _err; } return code; @@ -518,7 +520,25 @@ _err: static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { int32_t code = 0; - // TODO + + code = tsdbWriteBlockIdx(pCommitter->pWriter, pCommitter->nBlockIdx, NULL); + if (code) goto _err; + + code = tsdbUpdateDFileSetHeader(pCommitter->pWriter, NULL); + if (code) goto _err; + + code = tsdbDFileSetWriterClose(pCommitter->pWriter, 1); + if (code) goto _err; + + if (pCommitter->pReader) { + code = tsdbDFileSetReaderClose(pCommitter->pReader); + goto _err; + } + +_exit: + return code; + +_err: return code; } @@ -556,13 +576,47 @@ _err: static int32_t tsdbCommitTableDataStart(SCommitter *pCommitter) { int32_t code = 0; - // TODO + + // old + pCommitter->oBlock.flag = 0; + pCommitter->oBlock.nItem = 0; + pCommitter->oBlock.nData = 0; + if (pCommitter->pBlockIdx) { + code = tsdbReadBlock(pCommitter->pReader, &pCommitter->oBlock, NULL); + if (code) goto _err; + } + + // new + pCommitter->nBlock.flag = 0; + pCommitter->nBlock.nItem = 0; + pCommitter->nBlock.nData = 0; + +_err: return code; } static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter) { - int32_t code = 0; - // TODO + int32_t code = 0; + STbDataIter *pIter = NULL; + int32_t iBlock = 0; + int32_t nBlock = pCommitter->nBlock.nItem; + SBlock *pBlock; + SBlock block; + TSDBROW *pRow; + TSDBROW row; + + if (pCommitter->pTbData) { + code = tsdbTbDataIterCreate(pCommitter->pTbData, NULL, 0, &pIter); + if (code) goto _err; + } + + for (;;) { + /* code */ + } + + return code; + +_err: return code; } @@ -697,1734 +751,4 @@ static int32_t tsdbCommitTableDel(SCommitter *pCommitter) { _err: return code; -} - -// // ===================================== OLD ====================================== -#if 0 -struct SCommitIter { - STable *pTable; - STbDataIter *pIter; -}; - -#define TSDB_MAX_SUBBLOCKS 8 -#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) -#define TSDB_COMMIT_REPO(ch) TSDB_READ_REPO(&(ch->readh)) -#define TSDB_COMMIT_REPO_ID(ch) REPO_ID(TSDB_READ_REPO(&(ch->readh))) -#define TSDB_COMMIT_WRITE_FSET(ch) (&((ch)->wSet)) -#define TSDB_COMMIT_TABLE(ch) ((ch)->pTable) -#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD) -#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA) -#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST) -#define TSDB_COMMIT_SMAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAD) -#define TSDB_COMMIT_SMAL_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAL) -#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh)) -#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh)) -#define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh)) -#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->pVnode->config.tsdbCfg.maxRows) -#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) - -static int tsdbInitCommitH(SCommitter *pCommith, STsdb *pRepo); -static void tsdbSeekCommitIter(SCommitter *pCommith, TSKEY key); -static int tsdbNextCommitFid(SCommitter *pCommith); -static void tsdbDestroyCommitH(SCommitter *pCommith); -static int32_t tsdbCreateCommitIters(SCommitter *pCommith); -static void tsdbDestroyCommitIters(SCommitter *pCommith); -static int tsdbCommitToFile(SCommitter *pCommith, SDFileSet *pSet, int fid); -static int tsdbSetAndOpenCommitFile(SCommitter *pCommith, SDFileSet *pSet, int fid); -static int tsdbCommitToTable(SCommitter *pCommith, int tid); -static bool tsdbCommitIsSameFile(SCommitter *pCommith, int bidx); -static int tsdbMoveBlkIdx(SCommitter *pCommith, SBlockIdx *pIdx); -static int tsdbSetCommitTable(SCommitter *pCommith, STable *pTable); -static int tsdbComparKeyBlock(const void *arg1, const void *arg2); -static int tsdbWriteBlockInfo(SCommitter *pCommih); -static int tsdbCommitMemData(SCommitter *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); -static int tsdbMergeMemData(SCommitter *pCommith, SCommitIter *pIter, int bidx); -static int tsdbMoveBlock(SCommitter *pCommith, int bidx); -static int tsdbCommitAddBlock(SCommitter *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int -nSubBlocks); static int tsdbMergeBlockData(SCommitter *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY -keyLimit, - bool isLastOneBlock); -static void tsdbResetCommitTable(SCommitter *pCommith); -static void tsdbCloseCommitFile(SCommitter *pCommith, bool hasError); -static bool tsdbCanAddSubBlock(SCommitter *pCommith, SBlock *pBlock, SMergeInfo *pInfo); -static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, - SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); -static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); -static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn); -static int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead, - SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, - SMergeInfo *pMergeInfo); - -static int32_t tsdbCommitData(SCommitter *pCommith) { - int32_t fid; - SDFileSet *pSet = NULL; - int32_t code = 0; - STsdb *pTsdb = TSDB_COMMIT_REPO(pCommith); - - // Skip expired memory data and expired FSET - tsdbSeekCommitIter(pCommith, pCommith->rtn.minKey); - while ((pSet = tsdbFSIterNext(&(pCommith->fsIter)))) { - if (pSet->fid < pCommith->rtn.minFid) { - tsdbInfo("vgId:%d, FSET %d on level %d disk id %d expires, remove it", REPO_ID(pTsdb), pSet->fid, - TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); - } else { - break; - } - } - - // commit - fid = tsdbNextCommitFid(pCommith); - while (true) { - // Loop over both on disk and memory - if (pSet == NULL && fid == TSDB_IVLD_FID) break; - - if (pSet && (fid == TSDB_IVLD_FID || pSet->fid < fid)) { - // Only has existing FSET but no memory data to commit in this - // existing FSET, only check if file in correct retention - if (tsdbApplyRtnOnFSet(TSDB_COMMIT_REPO(pCommith), pSet, &(pCommith->rtn)) < 0) { - tsdbDestroyCommitH(pCommith); - return -1; - } - - pSet = tsdbFSIterNext(&(pCommith->fsIter)); - } else { - // Has memory data to commit - SDFileSet *pCSet; - int cfid; - - if (pSet == NULL || pSet->fid > fid) { - // Commit to a new FSET with fid: fid - pCSet = NULL; - cfid = fid; - } else { - // Commit to an existing FSET - pCSet = pSet; - cfid = pSet->fid; - pSet = tsdbFSIterNext(&(pCommith->fsIter)); - } - - if (tsdbCommitToFile(pCommith, pCSet, cfid) < 0) { - tsdbDestroyCommitH(pCommith); - return -1; - } - - fid = tsdbNextCommitFid(pCommith); - } - } - - return code; -} - -static int32_t tsdbCommitDel(SCommitter *pCommith) { - int32_t code = 0; - // TODO - return code; -} - -static int32_t tsdbCommitCache(SCommitter *pCommith) { - int32_t code = 0; - // TODO - return code; -} - -static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { - SDiskID did; - SDFileSet nSet = {0}; - STsdbFS *pfs = REPO_FS(pRepo); - int level; - - ASSERT(pSet->fid >= pRtn->minFid); - - level = tsdbGetFidLevel(pSet->fid, pRtn); - - if (tfsAllocDisk(pRepo->pVnode->pTfs, level, &did) < 0) { - terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; - return -1; - } - - if (did.level > TSDB_FSET_LEVEL(pSet)) { - // Need to move the FSET to higher level - tsdbInitDFileSet(pRepo, &nSet, did, pSet->fid, FS_TXN_VERSION(pfs)); - - if (tsdbCopyDFileSet(pSet, &nSet) < 0) { - tsdbError("vgId:%d, failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid, - TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno)); - return -1; - } - - if (tsdbUpdateDFileSet(pfs, &nSet) < 0) { - return -1; - } - - tsdbInfo("vgId:%d, FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid, - TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id); - } else { - // On a correct level - if (tsdbUpdateDFileSet(pfs, pSet) < 0) { - return -1; - } - } - - return 0; -} - -static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCHandle) { - int32_t code = 0; - - tsdbInfo("vgId:%d, start to commit", REPO_ID(pTsdb)); - - ASSERT(pTsdb->imem == NULL && pTsdb->mem); - pTsdb->imem = pTsdb->mem; - pTsdb->mem = NULL; - if (tsdbInitCommitH(pCHandle, pTsdb) < 0) { - return -1; - } - - tsdbStartFSTxn(pTsdb, 0, 0); - - return code; -} - -static int32_t tsdbEndCommit(SCommitter *pCHandle, int eno) { - int32_t code = 0; - STsdb *pTsdb = TSDB_COMMIT_REPO(pCHandle); - - tsdbDestroyCommitH(pCHandle); - tsdbEndFSTxn(pTsdb); - tsdbMemTableDestroy(pTsdb->imem); - pTsdb->imem = NULL; - - tsdbInfo("vgId:%d, commit over, %s", REPO_ID(pTsdb), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed"); - - return code; -} - -static int tsdbInitCommitH(SCommitter *pCommith, STsdb *pRepo) { - STsdbCfg *pCfg = REPO_CFG(pRepo); - - memset(pCommith, 0, sizeof(*pCommith)); - tsdbGetRtnSnap(pRepo, &(pCommith->rtn)); - - TSDB_FSET_SET_CLOSED(TSDB_COMMIT_WRITE_FSET(pCommith)); - - // Init read handle - if (tsdbInitReadH(&(pCommith->readh), pRepo) < 0) { - return -1; - } - - // Init file iterator - tsdbFSIterInit(&(pCommith->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD); - - if (tsdbCreateCommitIters(pCommith) < 0) { - tsdbDestroyCommitH(pCommith); - return -1; - } - - pCommith->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx)); - if (pCommith->aBlkIdx == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbDestroyCommitH(pCommith); - return -1; - } - - pCommith->aSupBlk = taosArrayInit(1024, sizeof(SBlock)); - if (pCommith->aSupBlk == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbDestroyCommitH(pCommith); - return -1; - } - - pCommith->aSubBlk = taosArrayInit(1024, sizeof(SBlock)); - if (pCommith->aSubBlk == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbDestroyCommitH(pCommith); - return -1; - } - - pCommith->pDataCols = tdNewDataCols(0, pCfg->maxRows); - if (pCommith->pDataCols == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbDestroyCommitH(pCommith); - return -1; - } - - return 0; -} - -// Skip all keys until key (not included) -static void tsdbSeekCommitIter(SCommitter *pCommith, TSKEY key) { - for (int i = 0; i < pCommith->niters; i++) { - SCommitIter *pIter = pCommith->iters + i; - if (pIter->pTable == NULL || pIter->pIter == NULL) continue; - - tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, pIter->pIter, key - 1, INT32_MAX, NULL, NULL, 0, - true, NULL); - } -} - -static int tsdbNextCommitFid(SCommitter *pCommith) { - STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); - int fid = TSDB_IVLD_FID; - - for (int i = 0; i < pCommith->niters; i++) { - SCommitIter *pIter = pCommith->iters + i; - // if (pIter->pTable == NULL || pIter->pIter == NULL) continue; - - TSKEY nextKey = tsdbNextIterKey(pIter->pIter); - if (nextKey == TSDB_DATA_TIMESTAMP_NULL) { - continue; - } else { - int tfid = (int)(TSDB_KEY_FID(nextKey, pCfg->days, pCfg->precision)); - if (fid == TSDB_IVLD_FID || fid > tfid) { - fid = tfid; - } - } - } - - return fid; -} - -static void tsdbDestroyCommitH(SCommitter *pCommith) { - pCommith->pDataCols = tdFreeDataCols(pCommith->pDataCols); - pCommith->aSubBlk = taosArrayDestroy(pCommith->aSubBlk); - pCommith->aSupBlk = taosArrayDestroy(pCommith->aSupBlk); - pCommith->aBlkIdx = taosArrayDestroy(pCommith->aBlkIdx); - tsdbDestroyCommitIters(pCommith); - tsdbDestroyReadH(&(pCommith->readh)); - tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); -} - -static int32_t tsdbCommitToFileStart(SCommitter *pCHandle, SDFileSet *pSet, int32_t fid) { - int32_t code = 0; - STsdb *pRepo = TSDB_COMMIT_REPO(pCHandle); - STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); - - ASSERT(pSet == NULL || pSet->fid == fid); - - pCHandle->fid = fid; - pCHandle->pSet = pSet; - pCHandle->isRFileSet = false; - pCHandle->isDFileSame = false; - pCHandle->isLFileSame = false; - taosArrayClear(pCHandle->aBlkIdx); - - tsdbGetFidKeyRange(pCfg->days, pCfg->precision, fid, &(pCHandle->minKey), &(pCHandle->maxKey)); - - code = tsdbSetAndOpenCommitFile(pCHandle, pSet, fid); - - return code; -} -static int32_t tsdbCommitToFileImpl(SCommitter *pCHandle) { - int32_t code = 0; - // TODO - return code; -} -static int32_t tsdbCommitToFileEnd(SCommitter *pCommith) { - int32_t code = 0; - STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); - - if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) - < - 0) { - tsdbError("vgId:%d, failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), pCommith->fid, - tstrerror(terrno)); - tsdbCloseCommitFile(pCommith, true); - // revert the file change - tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pCommith->pSet); - return -1; - } - - if (tsdbUpdateDFileSetHeader(&(pCommith->wSet)) < 0) { - tsdbError("vgId:%d, failed to update FSET %d header since %s", REPO_ID(pRepo), pCommith->fid, tstrerror(terrno)); - tsdbCloseCommitFile(pCommith, true); - // revert the file change - tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pCommith->pSet); - return -1; - } - - // Close commit file - tsdbCloseCommitFile(pCommith, false); - - if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) { - return -1; - } - - return code; -} -static int32_t tsdbCommitToFile(SCommitter *pCommith, SDFileSet *pSet, int fid) { - int32_t code = 0; - STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); - - // commit to file start - code = tsdbCommitToFileStart(pCommith, pSet, fid); - if (code) { - goto _err; - } - - // Loop to commit each table data in mem and file - int mIter = 0, fIter = 0; - int nBlkIdx = taosArrayGetSize(pCommith->readh.aBlkIdx); - - while (true) { - SBlockIdx *pIdx = NULL; - SCommitIter *pIter = NULL; - if (mIter < pCommith->niters) { - pIter = pCommith->iters + mIter; - if (fIter < nBlkIdx) { - pIdx = taosArrayGet(pCommith->readh.aBlkIdx, fIter); - } - } else if (fIter < nBlkIdx) { - pIdx = taosArrayGet(pCommith->readh.aBlkIdx, fIter); - } else { - break; - } - - if (pIter && pIter->pTable && (!pIdx || (pIter->pTable->suid <= pIdx->suid || pIter->pTable->uid <= pIdx->uid))) - { - if (tsdbCommitToTable(pCommith, mIter) < 0) { - tsdbCloseCommitFile(pCommith, true); - // revert the file change - tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); - return -1; - } - - if (pIdx && (pIter->pTable->uid == pIdx->uid)) { - ++fIter; - } - ++mIter; - } else if (pIter && !pIter->pTable) { - // When table already dropped during commit, pIter is not NULL but pIter->pTable is NULL. - ++mIter; // skip the table and do nothing - } else if (pIdx) { - if (tsdbMoveBlkIdx(pCommith, pIdx) < 0) { - tsdbCloseCommitFile(pCommith, true); - // revert the file change - tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); - return -1; - } - ++fIter; - } - } - - // commit to file end - code = tsdbCommitToFileEnd(pCommith); - if (code) { - goto _err; - } - - return code; - -_err: - return code; -} - -static int32_t tsdbCreateCommitIters(SCommitter *pCommith) { - int32_t code = 0; - STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); - SMemTable *pMem = pRepo->imem; - STbData *pTbData; - SCommitIter *pCommitIter; - STSchema *pTSchema = NULL; - - pCommith->niters = taosArrayGetSize(pMem->aTbData); - pCommith->iters = (SCommitIter *)taosMemoryCalloc(pCommith->niters, sizeof(SCommitIter)); - if (pCommith->iters == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - for (int32_t iIter = 0; iIter < pCommith->niters; iIter++) { - pTbData = (STbData *)taosArrayGetP(pMem->aTbData, iIter); - pCommitIter = &pCommith->iters[iIter]; - - pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, -1); - if (pTSchema) { - tsdbTbDataIterCreate(pTbData, NULL, 0, &pCommitIter->pIter); - - pCommitIter->pTable = (STable *)taosMemoryMalloc(sizeof(STable)); - pCommitIter->pTable->uid = pTbData->uid; - pCommitIter->pTable->suid = pTbData->suid; - pCommitIter->pTable->pSchema = pTSchema; - pCommitIter->pTable->pCacheSchema = NULL; - } - } - - return code; - -_err: - return code; -} - -static void tsdbDestroyCommitIters(SCommitter *pCommith) { - if (pCommith->iters == NULL) return; - - for (int i = 1; i < pCommith->niters; i++) { - tsdbTbDataIterDestroy(pCommith->iters[i].pIter); - if (pCommith->iters[i].pTable) { - tdFreeSchema(pCommith->iters[i].pTable->pSchema); - tdFreeSchema(pCommith->iters[i].pTable->pCacheSchema); - taosMemoryFreeClear(pCommith->iters[i].pTable); - } - } - - taosMemoryFree(pCommith->iters); - pCommith->iters = NULL; - pCommith->niters = 0; -} - -static int tsdbSetAndOpenCommitFile(SCommitter *pCommith, SDFileSet *pSet, int fid) { - SDiskID did; - STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); - SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith); - - if (tfsAllocDisk(REPO_TFS(pRepo), tsdbGetFidLevel(fid, &(pCommith->rtn)), &did) < 0) { - terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; - return -1; - } - - // Open read FSET - if (pSet) { - if (tsdbSetAndOpenReadFSet(&(pCommith->readh), pSet) < 0) { - return -1; - } - - pCommith->isRFileSet = true; - - if (tsdbLoadBlockIdx(&(pCommith->readh)) < 0) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - return -1; - } - - tsdbDebug("vgId:%d, FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo), - TSDB_FSET_FID(pSet), TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); - } else { - pCommith->isRFileSet = false; - } - - // Set and open commit FSET - if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) { - // Create a new FSET to write data - tsdbInitDFileSet(pRepo, pWSet, did, fid, FS_TXN_VERSION(REPO_FS(pRepo))); - - if (tsdbCreateDFileSet(pRepo, pWSet, true) < 0) { - tsdbError("vgId:%d, failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo), - TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet), tstrerror(terrno)); - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - } - return -1; - } - - pCommith->isDFileSame = false; - pCommith->isLFileSame = false; - - tsdbDebug("vgId:%d, FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet), - TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet)); - } else { - did.level = TSDB_FSET_LEVEL(pSet); - did.id = TSDB_FSET_ID(pSet); - - pCommith->wSet.fid = fid; - pCommith->wSet.state = 0; - - // TSDB_FILE_HEAD - SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith); - tsdbInitDFile(pRepo, pWHeadf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD); - if (tsdbCreateDFile(pRepo, pWHeadf, true, TSDB_FILE_HEAD) < 0) { - tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf), - tstrerror(terrno)); - - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - return -1; - } - } - - // TSDB_FILE_DATA - SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh)); - SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith); - tsdbInitDFileEx(pWDataf, pRDataf); - // if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) { - if (tsdbOpenDFile(pWDataf, TD_FILE_WRITE) < 0) { - tsdbError("vgId:%d, failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWDataf), - tstrerror(terrno)); - - tsdbCloseDFileSet(pWSet); - tsdbRemoveDFile(pWHeadf); - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - return -1; - } - } - pCommith->isDFileSame = true; - - // TSDB_FILE_LAST - SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh)); - SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith); - if (pRLastf->info.size < 32 * 1024) { - tsdbInitDFileEx(pWLastf, pRLastf); - pCommith->isLFileSame = true; - - // if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) { - if (tsdbOpenDFile(pWLastf, TD_FILE_WRITE) < 0) { - tsdbError("vgId:%d, failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf), - tstrerror(terrno)); - - tsdbCloseDFileSet(pWSet); - tsdbRemoveDFile(pWHeadf); - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - return -1; - } - } - } else { - tsdbInitDFile(pRepo, pWLastf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST); - pCommith->isLFileSame = false; - - if (tsdbCreateDFile(pRepo, pWLastf, true, TSDB_FILE_LAST) < 0) { - tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), - TSDB_FILE_FULL_NAME(pWLastf), - tstrerror(terrno)); - - tsdbCloseDFileSet(pWSet); - (void)tsdbRemoveDFile(pWHeadf); - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - return -1; - } - } - } - - // TSDB_FILE_SMAD - SDFile *pRSmadF = TSDB_READ_SMAD_FILE(&(pCommith->readh)); - SDFile *pWSmadF = TSDB_COMMIT_SMAD_FILE(pCommith); - - if (!taosCheckExistFile(TSDB_FILE_FULL_NAME(pRSmadF))) { - tsdbDebug("vgId:%d, create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmadF)); - tsdbInitDFile(pRepo, pWSmadF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAD); - - if (tsdbCreateDFile(pRepo, pWSmadF, true, TSDB_FILE_SMAD) < 0) { - tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), - TSDB_FILE_FULL_NAME(pWSmadF), - tstrerror(terrno)); - - tsdbCloseDFileSet(pWSet); - (void)tsdbRemoveDFile(pWHeadf); - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - return -1; - } - } - } else { - tsdbInitDFileEx(pWSmadF, pRSmadF); - if (tsdbOpenDFile(pWSmadF, O_RDWR) < 0) { - tsdbError("vgId:%d, failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF), - tstrerror(terrno)); - - tsdbCloseDFileSet(pWSet); - tsdbRemoveDFile(pWHeadf); - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - return -1; - } - } - } - - // TSDB_FILE_SMAL - SDFile *pRSmalF = TSDB_READ_SMAL_FILE(&(pCommith->readh)); - SDFile *pWSmalF = TSDB_COMMIT_SMAL_FILE(pCommith); - - if ((pCommith->isLFileSame) && taosCheckExistFile(TSDB_FILE_FULL_NAME(pRSmalF))) { - tsdbInitDFileEx(pWSmalF, pRSmalF); - if (tsdbOpenDFile(pWSmalF, O_RDWR) < 0) { - tsdbError("vgId:%d, failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF), - tstrerror(terrno)); - - tsdbCloseDFileSet(pWSet); - tsdbRemoveDFile(pWHeadf); - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - return -1; - } - } - } else { - tsdbDebug("vgId:%d, create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmalF)); - tsdbInitDFile(pRepo, pWSmalF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAL); - - if (tsdbCreateDFile(pRepo, pWSmalF, true, TSDB_FILE_SMAL) < 0) { - tsdbError("vgId:%d, failed to create file %s to commit since %s", REPO_ID(pRepo), - TSDB_FILE_FULL_NAME(pWSmalF), - tstrerror(terrno)); - - tsdbCloseDFileSet(pWSet); - (void)tsdbRemoveDFile(pWHeadf); - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - return -1; - } - } - } - } - - return 0; -} - -// extern int32_t tsTsdbMetaCompactRatio; - -static int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, - SBlockIdx *pIdx) { - size_t nSupBlocks; - size_t nSubBlocks; - uint32_t tlen; - SBlockInfo *pBlkInfo; - int64_t offset; - SBlock *pBlock; - - memset(pIdx, 0, sizeof(*pIdx)); - - nSupBlocks = taosArrayGetSize(pSupA); - nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA); - - if (nSupBlocks <= 0) { - // No data (data all deleted) - return 0; - } - - tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM)); - if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; - pBlkInfo = *ppBuf; - - pBlkInfo->delimiter = TSDB_FILE_DELIMITER; - pBlkInfo->suid = pTable->suid; - pBlkInfo->uid = pTable->uid; - - memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock)); - if (nSubBlocks > 0) { - memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock)); - - for (int i = 0; i < nSupBlocks; i++) { - pBlock = pBlkInfo->blocks + i; - - if (pBlock->numOfSubBlocks > 1) { - pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks); - } - } - } - - taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen); - - if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) { - return -1; - } - - tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM))); - - // Set pIdx - pBlock = taosArrayGetLast(pSupA); - - pIdx->suid = pTable->suid; - pIdx->uid = pTable->uid; - pIdx->hasLast = pBlock->last ? 1 : 0; - pIdx->maxKey = pBlock->maxKey; - pIdx->numOfBlocks = (uint32_t)nSupBlocks; - pIdx->len = tlen; - pIdx->offset = (uint32_t)offset; - - return 0; -} - -static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { - SBlockIdx *pBlkIdx; - size_t nidx = taosArrayGetSize(pIdxA); - int tlen = 0, size; - int64_t offset; - - if (nidx <= 0) { - // All data are deleted - pHeadf->info.offset = 0; - pHeadf->info.len = 0; - return 0; - } - - for (size_t i = 0; i < nidx; i++) { - pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i); - - size = tsdbEncodeSBlockIdx(NULL, pBlkIdx); - if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1; - - void *ptr = POINTER_SHIFT(*ppBuf, tlen); - tsdbEncodeSBlockIdx(&ptr, pBlkIdx); - - tlen += size; - } - - tlen += sizeof(TSCKSUM); - if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; - taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen); - - if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) { - return -1; - } - - tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM))); - pHeadf->info.offset = (uint32_t)offset; - pHeadf->info.len = tlen; - - return 0; -} - -// =================== Commit Time-Series Data -static int tsdbCommitToTable(SCommitter *pCommith, int tid) { - SCommitIter *pIter = pCommith->iters + tid; - TSKEY nextKey = tsdbNextIterKey(pIter->pIter); - - tsdbResetCommitTable(pCommith); - - // Set commit table - if (tsdbSetCommitTable(pCommith, pIter->pTable) < 0) { - return -1; - } - - // No disk data and no memory data, just return - if (pCommith->readh.pBlkIdx == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) { - return 0; - } - - // Must has disk data or has memory data - int nBlocks; - int bidx = 0; - SBlock *pBlock; - - if (pCommith->readh.pBlkIdx) { - if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) { - return -1; - } - - nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; - } else { - nBlocks = 0; - } - - if (bidx < nBlocks) { - pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - } else { - pBlock = NULL; - } - - while (true) { - if (pBlock == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) break; - - if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) || - (pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) { - if (tsdbMoveBlock(pCommith, bidx) < 0) { - return -1; - } - - bidx++; - if (bidx < nBlocks) { - pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - } else { - pBlock = NULL; - } - } else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) { - // merge pBlock data and memory data - if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) { - return -1; - } - - bidx++; - if (bidx < nBlocks) { - pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - } else { - pBlock = NULL; - } - nextKey = tsdbNextIterKey(pIter->pIter); - } else { - // Only commit memory data - if (pBlock == NULL) { - if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) { - return -1; - } - } else { - if (tsdbCommitMemData(pCommith, pIter, pBlock->minKey.ts - 1, true) < 0) { - return -1; - } - } - nextKey = tsdbNextIterKey(pIter->pIter); - } - } - - if (tsdbWriteBlockInfo(pCommith) < 0) { - tsdbError("vgId:%d, failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), - TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); - return -1; - } - - return 0; -} - -static int tsdbMoveBlkIdx(SCommitter *pCommith, SBlockIdx *pIdx) { - SReadH *pReadh = &pCommith->readh; - STsdb *pTsdb = TSDB_READ_REPO(pReadh); - STSchema *pTSchema = NULL; - int nBlocks = pIdx->numOfBlocks; - int bidx = 0; - - tsdbResetCommitTable(pCommith); - - pReadh->pBlkIdx = pIdx; - - if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { - return -1; - } - - STable table = {.suid = pIdx->suid, .uid = pIdx->uid, .pSchema = NULL}; - pCommith->pTable = &table; - - while (bidx < nBlocks) { - if (!pTSchema && !tsdbCommitIsSameFile(pCommith, bidx)) { - // Set commit table - pTSchema = metaGetTbTSchema(REPO_META(pTsdb), pIdx->uid, -1); // TODO: schema version - if (!pTSchema) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - table.pSchema = pTSchema; - if (tsdbSetCommitTable(pCommith, &table) < 0) { - taosMemoryFreeClear(pTSchema); - return -1; - } - } - - if (tsdbMoveBlock(pCommith, bidx) < 0) { - tsdbError("vgId:%d, failed to move block into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), - TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); - taosMemoryFreeClear(pTSchema); - return -1; - } - - ++bidx; - } - - if (tsdbWriteBlockInfo(pCommith) < 0) { - tsdbError("vgId:%d, failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), - TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); - taosMemoryFreeClear(pTSchema); - return -1; - } - - taosMemoryFreeClear(pTSchema); - return 0; -} - -static int tsdbSetCommitTable(SCommitter *pCommith, STable *pTable) { - STSchema *pSchema = tsdbGetTableSchemaImpl(TSDB_COMMIT_REPO(pCommith), pTable, false, false, -1); - - pCommith->pTable = pTable; - - if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - if (pCommith->isRFileSet) { - if (tsdbSetReadTable(&(pCommith->readh), pTable) < 0) { - return -1; - } - } else { - pCommith->readh.pBlkIdx = NULL; - } - return 0; -} - -static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { - TSKEY key = *(TSKEY *)arg1; - SBlock *pBlock = (SBlock *)arg2; - - if (key < pBlock->minKey.ts) { - return -1; - } else if (key > pBlock->maxKey.ts) { - return 1; - } else { - return 0; - } -} - -/** - * @brief Write SDataCols to data file. - * - * @param pRepo - * @param pTable - * @param pDFile - * @param pDFileAggr - * @param pDataCols The pDataCols would be generated from mem/imem directly with 2 bits bitmap or from tsdbRead - * interface with 1 bit bitmap. - * @param pBlock - * @param isLast - * @param isSuper - * @param ppBuf - * @param ppCBuf - * @param ppExBuf - * @return int - */ -static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols, - SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) - { - STsdbCfg *pCfg = REPO_CFG(pRepo); - SBlockData *pBlockData = NULL; - SAggrBlkData *pAggrBlkData = NULL; - STSchema *pSchema = pTable->pSchema; - int64_t offset = 0, offsetAggr = 0; - int rowsToWrite = pDataCols->numOfRows; - - ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRows); - ASSERT((!isLast) || rowsToWrite < pCfg->minRows); - - // Make buffer space - if (tsdbMakeRoom(ppBuf, tsdbBlockStatisSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) { - return -1; - } - pBlockData = (SBlockData *)(*ppBuf); - - if (tsdbMakeRoom(ppExBuf, tsdbBlockAggrSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) { - return -1; - } - pAggrBlkData = (SAggrBlkData *)(*ppExBuf); - - // Get # of cols not all NULL(not including key column) - col_id_t nColsNotAllNull = 0; - col_id_t nColsOfBlockSma = 0; - for (int ncol = 1; ncol < pDataCols->numOfCols; ++ncol) { // ncol from 1, we skip the timestamp column - STColumn *pColumn = pSchema->columns + ncol; - SDataCol *pDataCol = pDataCols->cols + ncol; - SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull; - SAggrBlkCol *pAggrBlkCol = (SAggrBlkCol *)pAggrBlkData + nColsOfBlockSma; - - if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it - continue; - } - - memset(pBlockCol, 0, sizeof(*pBlockCol)); - memset(pAggrBlkCol, 0, sizeof(*pAggrBlkCol)); - - pBlockCol->colId = pDataCol->colId; - pBlockCol->type = pDataCol->type; - pAggrBlkCol->colId = pDataCol->colId; - - if (isSuper && IS_BSMA_ON(pColumn) && tDataTypes[pDataCol->type].statisFunc) { -#if 0 - (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), - &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), - &(pBlockCol->numOfNull)); -#endif - (*tDataTypes[pDataCol->type].statisFunc)(pDataCols->bitmapMode, pDataCol->pBitmap, pDataCol->pData, - rowsToWrite, - &(pAggrBlkCol->min), &(pAggrBlkCol->max), &(pAggrBlkCol->sum), - &(pAggrBlkCol->minIndex), &(pAggrBlkCol->maxIndex), - &(pAggrBlkCol->numOfNull)); - - if (pAggrBlkCol->numOfNull == 0) { - pBlockCol->blen = 0; - } else { - pBlockCol->blen = 1; - } - ++nColsOfBlockSma; - } else if (tdIsBitmapBlkNorm(pDataCol->pBitmap, rowsToWrite, pDataCols->bitmapMode)) { - // check if all rows normal - pBlockCol->blen = 0; - } else { - pBlockCol->blen = 1; - } - - ++nColsNotAllNull; - } - - ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); - - // Compress the data if neccessary - int tcol = 0; // counter of not all NULL and written columns - uint32_t toffset = 0; - int32_t tsize = (int32_t)tsdbBlockStatisSize(nColsNotAllNull, SBlockVerLatest); - int32_t lsize = tsize; - uint32_t tsizeAggr = (uint32_t)tsdbBlockAggrSize(nColsOfBlockSma, SBlockVerLatest); - int32_t keyLen = 0; - int32_t nBitmaps = (int32_t)TD_BITMAP_BYTES(rowsToWrite); - int32_t sBitmaps = isSuper ? (int32_t)TD_BITMAP_BYTES_I(rowsToWrite) : nBitmaps; - - for (int ncol = 0; ncol < pDataCols->numOfCols; ++ncol) { - // All not NULL columns finish - if (ncol != 0 && tcol >= nColsNotAllNull) break; - - SDataCol *pDataCol = pDataCols->cols + ncol; - SBlockCol *pBlockCol = pBlockData->cols + tcol; - - if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue; - - int32_t flen; // final length - int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite, pDataCols->bitmapMode); - -#ifdef TD_SUPPORT_BITMAP - int32_t tBitmaps = 0; - int32_t tBitmapsLen = 0; - if ((ncol != 0) && (pBlockCol->blen > 0)) { - tBitmaps = isSuper ? sBitmaps : nBitmaps; - } -#endif - - void *tptr, *bptr; - - // Make room - if (tsdbMakeRoom(ppBuf, lsize + tlen + tBitmaps + 2 * COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) { - return -1; - } - pBlockData = (SBlockData *)(*ppBuf); - pBlockCol = pBlockData->cols + tcol; - tptr = POINTER_SHIFT(pBlockData, lsize); - - if (pCfg->compression == TWO_STAGE_COMP && tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) { - return -1; - } - - // Compress or just copy - if (pCfg->compression) { -#if 0 - flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite + tBitmaps, tptr, - tlen + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf, - tlen + COMP_OVERFLOW_BYTES); -#endif - flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr, - tlen + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf, - tlen + COMP_OVERFLOW_BYTES); - if (tBitmaps > 0) { - bptr = POINTER_SHIFT(pBlockData, lsize + flen); - if (isSuper && !tdDataColsIsBitmapI(pDataCols)) { - tdMergeBitmap((uint8_t *)pDataCol->pBitmap, rowsToWrite, (uint8_t *)pDataCol->pBitmap); - } - tBitmapsLen = - tsCompressTinyint((char *)pDataCol->pBitmap, tBitmaps, tBitmaps, bptr, tBitmaps + COMP_OVERFLOW_BYTES, - pCfg->compression, *ppCBuf, tBitmaps + COMP_OVERFLOW_BYTES); - TASSERT((tBitmapsLen > 0) && (tBitmapsLen <= (tBitmaps + COMP_OVERFLOW_BYTES))); - flen += tBitmapsLen; - } - } else { - flen = tlen; - memcpy(tptr, pDataCol->pData, flen); - if (tBitmaps > 0) { - bptr = POINTER_SHIFT(pBlockData, lsize + flen); - if (isSuper && !tdDataColsIsBitmapI(pDataCols)) { - tdMergeBitmap((uint8_t *)pDataCol->pBitmap, rowsToWrite, (uint8_t *)pDataCol->pBitmap); - } - memcpy(bptr, pDataCol->pBitmap, tBitmaps); - tBitmapsLen = tBitmaps; - flen += tBitmapsLen; - } - } - - // Add checksum - ASSERT(flen > 0); - ASSERT(tBitmapsLen <= 1024); - flen += sizeof(TSCKSUM); - taosCalcChecksumAppend(0, (uint8_t *)tptr, flen); - tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM))); - - if (ncol != 0) { - pBlockCol->offset = toffset; - pBlockCol->len = flen; // data + bitmaps - pBlockCol->blen = tBitmapsLen; - ++tcol; - } else { - keyLen = flen; - } - - toffset += flen; - lsize += flen; - } - - pBlockData->delimiter = TSDB_FILE_DELIMITER; - pBlockData->uid = pTable->uid; - pBlockData->numOfCols = nColsNotAllNull; - - taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize); - tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM))); - - // Write the whole block to file - if (tsdbAppendDFile(pDFile, (void *)pBlockData, lsize, &offset) < lsize) { - return -1; - } - - uint32_t aggrStatus = nColsOfBlockSma > 0 ? 1 : 0; - if (aggrStatus > 0) { - taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr); - tsdbUpdateDFileMagic(pDFileAggr, POINTER_SHIFT(pAggrBlkData, tsizeAggr - sizeof(TSCKSUM))); - - // Write the whole block to file - if (tsdbAppendDFile(pDFileAggr, (void *)pAggrBlkData, tsizeAggr, &offsetAggr) < tsizeAggr) { - return -1; - } - } - - // Update pBlock membership variables - pBlock->last = isLast; - pBlock->offset = offset; - pBlock->algorithm = pCfg->compression; - pBlock->numOfRows = rowsToWrite; - pBlock->len = lsize; - pBlock->keyLen = keyLen; - pBlock->numOfSubBlocks = isSuper ? 1 : 0; - pBlock->numOfCols = nColsNotAllNull; - pBlock->numOfBSma = nColsOfBlockSma; - pBlock->minKey.ts = dataColsKeyFirst(pDataCols); - pBlock->maxKey.ts = dataColsKeyLast(pDataCols); - pBlock->aggrStat = aggrStatus; - pBlock->blkVer = SBlockVerLatest; - pBlock->aggrOffset = (uint64_t)offsetAggr; - - tsdbDebug("vgId:%d, uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64 - " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, - REPO_ID(pRepo), pTable->uid, TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len, - pBlock->numOfCols, pBlock->minKey.ts, pBlock->maxKey.ts); - - return 0; -} - -static int tsdbWriteBlock(SCommitter *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, - bool isSuper) { - return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, - isLast ? TSDB_COMMIT_SMAL_FILE(pCommith) : TSDB_COMMIT_SMAD_FILE(pCommith), pDataCols, - pBlock, isLast, isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))), - (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))), (void **)(&(TSDB_COMMIT_EXBUF(pCommith)))); -} - -static int tsdbWriteBlockInfo(SCommitter *pCommih) { - SDFile *pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); - SBlockIdx blkIdx; - STable *pTable = TSDB_COMMIT_TABLE(pCommih); - - if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void - **)(&(TSDB_COMMIT_BUF(pCommih))), - &blkIdx) < 0) { - return -1; - } - - if (blkIdx.numOfBlocks == 0) { - return 0; - } - - if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static int tsdbCommitMemData(SCommitter *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) { - STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbCfg *pCfg = REPO_CFG(pRepo); - SMergeInfo mInfo; - int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); - SDFile *pDFile; - bool isLast; - SBlock block; - - while (true) { - tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, pIter->pIter, keyLimit, defaultRows, - pCommith->pDataCols, NULL, 0, pCfg->update, &mInfo); - - if (pCommith->pDataCols->numOfRows <= 0) break; - - if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRows) { - pDFile = TSDB_COMMIT_DATA_FILE(pCommith); - isLast = false; - } else { - pDFile = TSDB_COMMIT_LAST_FILE(pCommith); - isLast = true; - } - - if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; - - if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) { - return -1; - } - } - - return 0; -} - -static int tsdbMergeMemData(SCommitter *pCommith, SCommitIter *pIter, int bidx) { - STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbCfg *pCfg = REPO_CFG(pRepo); - int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; - SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - TSKEY keyLimit; - int16_t colId = PRIMARYKEY_TIMESTAMP_COL_ID; - SMergeInfo mInfo; - SBlock subBlocks[TSDB_MAX_SUBBLOCKS]; - SBlock block, supBlock; - SDFile *pDFile; - - if (bidx == nBlocks - 1) { - keyLimit = pCommith->maxKey; - } else { - keyLimit = pBlock[1].minKey.ts - 1; - } - - STbDataIter titer = *(pIter->pIter); - if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1, false) < 0) return -1; - - tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, - pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update, - &mInfo); - - if (mInfo.nOperations == 0) { - // no new data to insert (all updates denied) - if (tsdbMoveBlock(pCommith, bidx) < 0) { - return -1; - } - *(pIter->pIter) = titer; - } else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) { - // Ignore the block - ASSERT(0); - *(pIter->pIter) = titer; - } else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) { - // Add a sub-block - tsdbLoadDataFromCache(TSDB_COMMIT_REPO(pCommith), pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, - pCommith->pDataCols, pCommith->readh.pDCols[0]->cols[0].pData, - pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo); - if (pBlock->last) { - pDFile = TSDB_COMMIT_LAST_FILE(pCommith); - } else { - pDFile = TSDB_COMMIT_DATA_FILE(pCommith); - } - - if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1; - - if (pBlock->numOfSubBlocks == 1) { - subBlocks[0] = *pBlock; - subBlocks[0].numOfSubBlocks = 0; - } else { - memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), - sizeof(SBlock) * pBlock->numOfSubBlocks); - } - subBlocks[pBlock->numOfSubBlocks] = block; - supBlock = *pBlock; - supBlock.minKey.ts = mInfo.keyFirst; - supBlock.maxKey.ts = mInfo.keyLast; - supBlock.numOfSubBlocks++; - supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed; - supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock); - - if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1; - } else { - if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; - if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return - -1; - } - - return 0; -} - -static bool tsdbCommitIsSameFile(SCommitter *pCommith, int bidx) { - SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - if (pBlock->last) { - return pCommith->isLFileSame; - } - return pCommith->isDFileSame; -} - -static int tsdbMoveBlock(SCommitter *pCommith, int bidx) { - SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - SDFile *pDFile; - SBlock block; - bool isSameFile; - - ASSERT(pBlock->numOfSubBlocks > 0); - - if (pBlock->last) { - pDFile = TSDB_COMMIT_LAST_FILE(pCommith); - isSameFile = pCommith->isLFileSame; - } else { - pDFile = TSDB_COMMIT_DATA_FILE(pCommith); - isSameFile = pCommith->isDFileSame; - } - - if (isSameFile) { - if (pBlock->numOfSubBlocks == 1) { - if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) { - return -1; - } - } else { - block = *pBlock; - block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSubBlk); - - if (tsdbCommitAddBlock(pCommith, &block, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), - pBlock->numOfSubBlocks) < 0) { - return -1; - } - } - } else { - if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; - if (tsdbWriteBlock(pCommith, pDFile, pCommith->readh.pDCols[0], &block, pBlock->last, true) < 0) return -1; - if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; - } - - return 0; -} - -static int tsdbCommitAddBlock(SCommitter *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int -nSubBlocks) { - if (taosArrayPush(pCommith->aSupBlk, pSupBlock) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - if (pSubBlocks && taosArrayAddBatch(pCommith->aSubBlk, pSubBlocks, nSubBlocks) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static int tsdbMergeBlockData(SCommitter *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, - bool isLastOneBlock) { - STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbCfg *pCfg = REPO_CFG(pRepo); - SBlock block; - SDFile *pDFile; - bool isLast; - int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); - - int biter = 0; - while (true) { - tsdbLoadAndMergeFromCache(TSDB_COMMIT_REPO(pCommith), pCommith->readh.pDCols[0], &biter, pIter, - pCommith->pDataCols, - keyLimit, defaultRows, pCfg->update); - - if (pCommith->pDataCols->numOfRows == 0) break; - - if (isLastOneBlock) { - if (pCommith->pDataCols->numOfRows < pCfg->minRows) { - pDFile = TSDB_COMMIT_LAST_FILE(pCommith); - isLast = true; - } else { - pDFile = TSDB_COMMIT_DATA_FILE(pCommith); - isLast = false; - } - } else { - pDFile = TSDB_COMMIT_DATA_FILE(pCommith); - isLast = false; - } - - if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; - if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; - } - - return 0; -} - -static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, - SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update) { - TSKEY key1 = INT64_MAX; - TSKEY key2 = INT64_MAX; - TSKEY lastKey = TSKEY_INITIAL_VAL; - STSchema *pSchema = NULL; - - ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey); - tdResetDataCols(pTarget); - - pTarget->bitmapMode = pDataCols->bitmapMode; - // TODO: filter Multi-Version - // TODO: support delete function - while (true) { - key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); - STSRow *row = tsdbNextIterRow(pCommitIter->pIter); - if (row == NULL || TD_ROW_KEY(row) > maxKey) { - key2 = INT64_MAX; - } else { - key2 = TD_ROW_KEY(row); - } - - if (key1 == INT64_MAX && key2 == INT64_MAX) break; - - if (key1 < key2) { - if (lastKey != TSKEY_INITIAL_VAL) { - ++pTarget->numOfRows; - } - for (int i = 0; i < pDataCols->numOfCols; ++i) { - // TODO: dataColAppendVal may fail - SCellVal sVal = {0}; - if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter, pDataCols->bitmapMode) < 0) { - TASSERT(0); - } - tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints, - pTarget->bitmapMode, false); - } - - lastKey = key1; - ++(*iter); - } else if (key1 > key2) { - if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) { - pSchema = tsdbGetTableSchemaImpl(pTsdb, pCommitIter->pTable, false, false, TD_ROW_SVER(row)); - ASSERT(pSchema != NULL); - } - - if (key2 == lastKey) { - if (TD_SUPPORT_UPDATE(update)) { - tdAppendSTSRowToDataCol(row, pSchema, pTarget, true); - } - } else { - if (lastKey != TSKEY_INITIAL_VAL) { - ++pTarget->numOfRows; - } - tdAppendSTSRowToDataCol(row, pSchema, pTarget, false); - lastKey = key2; - } - - tsdbTbDataIterNext(pCommitIter->pIter); - } else { - if (lastKey != key1) { - if (lastKey != TSKEY_INITIAL_VAL) { - ++pTarget->numOfRows; - } - lastKey = key1; - } - - // copy disk data - for (int i = 0; i < pDataCols->numOfCols; ++i) { - SCellVal sVal = {0}; - // no duplicated TS keys in pDataCols from file - if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter, pDataCols->bitmapMode) < 0) { - TASSERT(0); - } - // TODO: tdAppendValToDataCol may fail - tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints, - pTarget->bitmapMode, false); - } - - if (TD_SUPPORT_UPDATE(update)) { - // copy mem data(Multi-Version) - if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) { - pSchema = tsdbGetTableSchemaImpl(pTsdb, pCommitIter->pTable, false, false, TD_ROW_SVER(row)); - ASSERT(pSchema != NULL); - } - - // TODO: merge with Multi-Version - tdAppendSTSRowToDataCol(row, pSchema, pTarget, true); - } - ++(*iter); - tsdbTbDataIterNext(pCommitIter->pIter); - } - - if (pTarget->numOfRows >= (maxRows - 1)) break; - } - - if (lastKey != TSKEY_INITIAL_VAL) { - ++pTarget->numOfRows; - } -} - -static void tsdbResetCommitTable(SCommitter *pCommith) { - taosArrayClear(pCommith->aSubBlk); - taosArrayClear(pCommith->aSupBlk); - pCommith->pTable = NULL; -} - -static void tsdbCloseCommitFile(SCommitter *pCommith, bool hasError) { - if (pCommith->isRFileSet) { - tsdbCloseAndUnsetFSet(&(pCommith->readh)); - } - - if (!hasError) { - TSDB_FSET_FSYNC(TSDB_COMMIT_WRITE_FSET(pCommith)); - } - tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); -} - -static bool tsdbCanAddSubBlock(SCommitter *pCommith, SBlock *pBlock, SMergeInfo *pInfo) { - STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbCfg *pCfg = REPO_CFG(pRepo); - int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed; - - ASSERT(mergeRows > 0); - - if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRows) { - if (pBlock->last) { - if (pCommith->isLFileSame && mergeRows < pCfg->minRows) return true; - } else { - if (pCommith->isDFileSame && mergeRows <= pCfg->maxRows) return true; - } - } - - return false; -} - -static int tsdbAppendTableRowToCols(STsdb *pTsdb, STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row, - bool merge) { - if (pCols) { - if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) { - *ppSchema = tsdbGetTableSchemaImpl(pTsdb, pTable, false, false, TD_ROW_SVER(row)); - if (*ppSchema == NULL) { - ASSERT(false); - return -1; - } - } - - tdAppendSTSRowToDataCol(row, *ppSchema, pCols, merge); - } - - return 0; -} - -static int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead, - SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, - SMergeInfo *pMergeInfo) { - ASSERT(maxRowsToRead > 0 && nFilterKeys >= 0); - if (pIter == NULL) return 0; - STSchema *pSchema = NULL; - TSKEY rowKey = 0; - TSKEY fKey = 0; - // only fetch lastKey from mem data as file data not used in this function actually - TSKEY lastKey = TSKEY_INITIAL_VAL; - bool isRowDel = false; - int filterIter = 0; - STSRow *row = NULL; - SMergeInfo mInfo; - - // TODO: support Multi-Version(the rows with the same TS keys in memory can't be merged if its version refered by - // query handle) - - if (pMergeInfo == NULL) pMergeInfo = &mInfo; - - memset(pMergeInfo, 0, sizeof(*pMergeInfo)); - pMergeInfo->keyFirst = INT64_MAX; - pMergeInfo->keyLast = INT64_MIN; - if (pCols) tdResetDataCols(pCols); - - row = tsdbNextIterRow(pIter); - if (row == NULL || TD_ROW_KEY(row) > maxKey) { - rowKey = INT64_MAX; - isRowDel = false; - } else { - rowKey = TD_ROW_KEY(row); - isRowDel = TD_ROW_IS_DELETED(row); - } - - if (filterIter >= nFilterKeys) { - fKey = INT64_MAX; - } else { - fKey = tdGetKey(filterKeys[filterIter]); - } - // 1. fkey - no dup since merged up to maxVersion of each query handle by tsdbLoadBlockDataCols - // 2. rowKey - would dup since Multi-Version supported - while (true) { - if (fKey == INT64_MAX && rowKey == INT64_MAX) break; - - if (fKey < rowKey) { - pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey); - pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey); - - filterIter++; - if (filterIter >= nFilterKeys) { - fKey = INT64_MAX; - } else { - fKey = tdGetKey(filterKeys[filterIter]); - } -#if 1 - } else if (fKey > rowKey) { - if (isRowDel) { - // TODO: support delete function - pMergeInfo->rowsDeleteFailed++; - } else { - if (pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed >= maxRowsToRead) break; - if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; - - if (lastKey != rowKey) { - pMergeInfo->rowsInserted++; - pMergeInfo->nOperations++; - pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey); - pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey); - if (pCols) { - if (lastKey != TSKEY_INITIAL_VAL) { - ++pCols->numOfRows; - } - tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false); - } - lastKey = rowKey; - } else { - if (keepDup) { - tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true); - } else { - // discard - } - } - } - - tsdbTbDataIterNext(pIter); - row = tsdbNextIterRow(pIter); - if (row == NULL || TD_ROW_KEY(row) > maxKey) { - rowKey = INT64_MAX; - isRowDel = false; - } else { - rowKey = TD_ROW_KEY(row); - isRowDel = TD_ROW_IS_DELETED(row); - } - } else { // fkey == rowKey - if (isRowDel) { // TODO: support delete function(How to stands for delete in file? rowVersion = -1?) - ASSERT(!keepDup); - if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; - pMergeInfo->rowsDeleteSucceed++; - pMergeInfo->nOperations++; - tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false); - } else { - if (keepDup) { - if (pCols && pMergeInfo->nOperations >= pCols->maxPoints) break; - if (lastKey != rowKey) { - pMergeInfo->rowsUpdated++; - pMergeInfo->nOperations++; - pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, rowKey); - pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, rowKey); - if (pCols) { - if (lastKey != TSKEY_INITIAL_VAL) { - ++pCols->numOfRows; - } - tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, false); - } - lastKey = rowKey; - } else { - tsdbAppendTableRowToCols(pTsdb, pTable, pCols, &pSchema, row, true); - } - } else { - pMergeInfo->keyFirst = TMIN(pMergeInfo->keyFirst, fKey); - pMergeInfo->keyLast = TMAX(pMergeInfo->keyLast, fKey); - } - } - - tsdbTbDataIterNext(pIter); - row = tsdbNextIterRow(pIter); - if (row == NULL || TD_ROW_KEY(row) > maxKey) { - rowKey = INT64_MAX; - isRowDel = false; - } else { - rowKey = TD_ROW_KEY(row); - isRowDel = TD_ROW_IS_DELETED(row); - } - - filterIter++; - if (filterIter >= nFilterKeys) { - fKey = INT64_MAX; - } else { - fKey = tdGetKey(filterKeys[filterIter]); - } - } -#endif - } - if (pCols && (lastKey != TSKEY_INITIAL_VAL)) { - ++pCols->numOfRows; - } - - return 0; -} -#endif \ No newline at end of file +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 3d98f186371a251364ce76ab1416936e6b433d66..04990f6396feccedcf9652e27bea3bbc6b4b7d58 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -239,6 +239,8 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) { SMemSkipListNode *pHead = pIter->pTbData->sl.pHead; SMemSkipListNode *pTail = pIter->pTbData->sl.pTail; + if (pIter == NULL) return false; + if (pIter->backward) { ASSERT(pIter->pNode != pTail); @@ -271,6 +273,8 @@ bool tsdbTbDataIterGet(STbDataIter *pIter, TSDBROW *pRow) { SMemSkipListNode *pTail = pIter->pTbData->sl.pTail; TSDBROW row = {0}; + if (pIter == NULL) return false; + if (pRow == NULL) { pRow = &row; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 90f021b36776c702aea7e92c8da01172de655fb0..df16da593212b93d77848f6797c379601f3e5c93 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -715,3 +715,11 @@ int32_t tGetDelFileHdr(uint8_t *p, SDelFile *pDelFile) { return n; } + +int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision) { + if (key < 0) { + return (int)((key + 1) / tsTickPerMin[precision] / minutes - 1); + } else { + return (int)((key / tsTickPerMin[precision] / minutes)); + } +} \ No newline at end of file