diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 2127b74d21a66e2f551518c26b9945a7603e2856..3db4ba7a01480e2a03db7a469ac5567605e27193 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -241,7 +241,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "No table d TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "File already exists") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, 0, 0x0611, "Need to reconfigure table") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, 0, 0x0612, "Invalid information to create table") -TAOS_DEFINE_ERROR(TSDB_TDB_NO_AVAIL_DISK, 0, 0x0613, "No available disk") +TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, 0, 0x0613, "No available disk") // query TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "Invalid handle") diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index 09f63e97d6366beca2c6aff4685deded52bc5473..766aa78850c66028421b168b8f038ad6bad4066f 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -207,6 +207,22 @@ static FORCE_INLINE void tsdbUpdateDFileMagic(SDFile* pDFile, void* pCksm) { pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t*)(pCksm), sizeof(TSCKSUM)); } +static FORCE_INLINE int tsdbCreateAndOpenDFile(SDFile* pDFile) { + if (tsdbOpenDFile(pDFile, O_WRONLY | O_CREAT | O_EXCL) < 0) { + return -1; + } + + pDFile->info.size += TSDB_FILE_HEAD_SIZE; + + if (tsdbUpdaeDFileHeader(pDFile) < 0) { + tsdbCloseDFile(pDFile); + remove(TSDB_FILE_FULL_NAME(pDFile)); + return -1; + } + + return 0; +} + // =============== SDFileSet typedef struct { int fid; diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 26fd55efa3774f2eab5fefe07c95586a8d0dae11..c894cec442240a745db9a488ccbef6c5f285d251 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -26,14 +26,16 @@ typedef struct { } SRtn; typedef struct { - int version; - SRtn rtn; // retention snapshot - bool isRFileSet; - SReadH readh; + uint32_t version; + SRtn rtn; // retention snapshot SFSIter fsIter; // tsdb file iterator int niters; // memory iterators SCommitIter *iters; - SDFileSet wSet; // commit file + bool isRFileSet; // read and commit FSET + SReadH readh; + SDFileSet wSet; + bool isDFileSame; + bool isLFileSame; TSKEY minKey; TSKEY maxKey; SArray * aBlkIdx; // SBlockIdx array @@ -138,7 +140,7 @@ _err: static int tsdbCommitTSData(STsdbRepo *pRepo) { SMemTable *pMem = pRepo->imem; STsdbCfg * pCfg = REPO_CFG(pRepo); - SCommitH ch = {0}; + SCommitH commith = {0}; SDFileSet *pSet = NULL; SDFileSet nSet; int fid; @@ -146,19 +148,19 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { if (pMem->numOfRows <= 0) return 0; // Resource initialization - if (tsdbInitCommitH(pRepo, &ch) < 0) { + if (tsdbInitCommitH(&commith, pRepo) < 0) { return -1; } // Skip expired memory data and expired FSET - tsdbSeekCommitIter(&ch, ch.rtn.minKey); + tsdbSeekCommitIter(&commith, commith.rtn.minKey); while (true) { - pSet = tsdbFSIterNext(&(ch.fsIter)); - if (pSet == NULL || pSet->fid >= ch.rtn.minFid) break; + pSet = tsdbFSIterNext(&(commith.fsIter)); + if (pSet == NULL || pSet->fid >= commith.rtn.minFid) break; } // Loop to commit to each file - fid = tsdbNextCommitFid(&(ch)); + fid = tsdbNextCommitFid(&(commith)); while (true) { // Loop over both on disk and memory if (pSet == NULL && fid == TSDB_IVLD_FID) break; @@ -168,31 +170,32 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { // existing FSET, only check if file in correct retention int level, id; - tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(ch.rtn)), &level, &id); + tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(commith.rtn)), &level, &id); if (level == TFS_UNDECIDED_LEVEL) { - terrno = TSDB_TDB_NO_AVAIL_DISK; - tsdbDestroyCommitH(&ch); + terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; + tsdbDestroyCommitH(&commith); return -1; } if (level > TSDB_FSET_LEVEL(pSet)) { + // Need to move the FSET to higher level if (tsdbCopyDFileSet(*pSet, level, id, &nSet) < 0) { - tsdbDestroyCommitH(&ch); + tsdbDestroyCommitH(&commith); return -1; } if (tsdbUpdateDFileSet(pRepo, &nSet) < 0) { - tsdbDestroyCommitH(&ch); + tsdbDestroyCommitH(&commith); return -1; } } else { if (tsdbUpdateDFileSet(pRepo, pSet) < 0) { - tsdbDestroyCommitH(&ch); + tsdbDestroyCommitH(&commith); return -1; } } - pSet = tsdbFSIterNext(&(ch.fsIter)); + pSet = tsdbFSIterNext(&(commith.fsIter)); } else { // Has memory data to commit SDFileSet *pCSet; @@ -206,18 +209,18 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { // Commit to an existing FSET pCSet = pSet; cfid = pSet->fid; - pSet = tsdbFSIterNext(&(ch.fsIter)); + pSet = tsdbFSIterNext(&(commith.fsIter)); } - fid = tsdbNextCommitFid(&ch); + fid = tsdbNextCommitFid(&commith); - if (tsdbCommitToFile(pCSet, &ch, cfid) < 0) { - tsdbDestroyCommitH(&ch); + if (tsdbCommitToFile(pCSet, &commith, cfid) < 0) { + tsdbDestroyCommitH(&commith); return -1; } } } - tsdbDestroyCommitH(&ch); + tsdbDestroyCommitH(&commith); return 0; } @@ -260,7 +263,6 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS } static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { - int level, id; STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbCfg * pCfg = REPO_CFG(pRepo); @@ -269,45 +271,13 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { tsdbResetCommitFile(pCommith); tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey)); - tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &level, &id); - if (level == TFS_UNDECIDED_LEVEL) { - terrno = TSDB_TDB_NO_AVAIL_DISK; - return -1; - } - - // Set commit file - if (pSet == NULL || level > TSDB_FSET_LEVEL(pSet)) { - tsdbInitDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith), REPO_ID(pRepo), fid, pCommith->version, level, id); - } else { - level = TSDB_FSET_LEVEL(pSet); - id = TSDB_FSET_ID(pSet); - - // TSDB_FILE_HEAD - SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith); - tsdbInitDFile(pWHeadf, REPO_ID(pRepo), fid, pCommith->version, level, id, NULL, TSDB_FILE_HEAD); - - // TSDB_FILE_DATA - SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh)); - SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith); - tsdbInitDFileWithOld(pWDataf, pRDataf); - - // TSDB_FILE_LAST - SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh)); - SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith); - if (pRLastf->info.size < 32 * 1024) { - tsdbInitDFileWithOld(pWLastf, pRLastf); - } else { - tsdbInitDFile(pWLastf, REPO_ID(pRepo), fid, pCommith->version, level, id, NULL, TSDB_FILE_LAST); - } - } - - // Open commit file - if (tsdbOpenCommitFile(pCommith, pSet) < 0) { + // Set and open files + if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) { return -1; } // Loop to commit each table data - for (int tid = 0; tid < pCommith->niters; tid++) { + for (int tid = 1; tid < pCommith->niters; tid++) { SCommitIter *pIter = pCommith->iters + tid; if (pIter->pTable == NULL) continue; @@ -354,7 +324,8 @@ static SCommitIter *tsdbCreateCommitIters(SCommitH *pCommith) { if (tsdbUnlockRepoMeta(pRepo) < 0) return -1; for (int i = 0; i < pMem->maxTables; i++) { - if ((pCommith->iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(pCommith->iters[i].pTable) == pMem->tData[i]->uid)) { + if ((pCommith->iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && + (TABLE_UID(pCommith->iters[i].pTable) == pMem->tData[i]->uid)) { if ((pCommith->iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; @@ -379,20 +350,20 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) { free(pCommith->iters); pCommith->iters = NULL; + pCommith->niters = 0; } // Skip all keys until key (not included) static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) { for (int i = 0; i < pCommith->niters; i++) { SCommitIter *pIter = pCommith->iters + i; - if (pIter->pTable == NULL) continue; - if (pIter->pIter == NULL) continue; + if (pIter->pTable == NULL || pIter->pIter == NULL) continue; tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key - 1, INT32_MAX, NULL, NULL, 0, true, NULL); } } -static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pCommith) { +static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo) { STsdbCfg *pCfg = REPO_CFG(pRepo); memset(pCommith, 0, sizeof(*pCommith)); @@ -404,6 +375,11 @@ static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pCommith) { tsdbGetRtnSnap(pRepo, &(pCommith->rtn)); + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + SDFile *pDFile = TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(pCommith), ftype); + TSDB_FILE_SET_CLOSED(pDFile); + } + // Init read handle if (tsdbInitReadH(&(pCommith->readh), pRepo) < 0) { return -1; @@ -489,13 +465,12 @@ static int tsdbGetFidLevel(int fid, SRtn *pRtn) { } static int tsdbNextCommitFid(SCommitH *pCommith) { - SCommitIter *pIter; - STsdbRepo * pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbCfg * pCfg = REPO_CFG(pRepo); - int fid = TSDB_IVLD_FID; + STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg * pCfg = REPO_CFG(pRepo); + int fid = TSDB_IVLD_FID; for (int i = 0; i < pCommith->niters; i++) { - pIter = pCommith->iters + i; + SCommitIter *pIter = pCommith->iters + i; if (pIter->pTable == NULL || pIter->pIter == NULL) continue; TSKEY nextKey = tsdbNextIterKey(pIter->pIter); @@ -514,7 +489,7 @@ static int tsdbNextCommitFid(SCommitH *pCommith) { static int tsdbCommitToTable(SCommitH *pCommith, int tid) { SCommitIter *pIter = pCommith->iters + tid; - if (pIter->pTable == NULL) return 0; + TSKEY nextKey = tsdbNextIterKey(pIter->pIter); TSDB_RLOCK_TABLE(pIter->pTable); @@ -525,6 +500,82 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { return -1; } + if (pCommith->readh.pBlkIdx == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) { + // No disk data and no memory data + TSDB_RUNLOCK_TABLE(pIter->pTable); + return 0; + } else { + // Must has disk data, maybe has memory data + int nBlocks; + int bidx = 0; + SBlock *pBlock; + + if (pCommith->readh.pBlkIdx) { + if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) { + TSDB_RUNLOCK_TABLE(pIter->pTable); + return -1; + } + + nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; + } else { + nBlocks = 0; + } + + if (bidx < nBlocks) { + pBlock = pCommith->readh.pBlkInfo->blocks + bidx; + } else { + pBlock = NULL; + } + + while (true) { + if (pBlock == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) break; + + if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) || + (pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) { + if (tsdbMoveBlock(pCommith, bidx) < 0) { + TSDB_RUNLOCK_TABLE(pIter->pTable); + return -1; + } + + bidx++; + if (bidx < nBlocks) { + pBlock = pCommith->readh.pBlkInfo->blocks + bidx; + } else { + pBlock = NULL; + } + } else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) { + // merge pBlock data and memory data + if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) { + TSDB_RUNLOCK_TABLE(pIter->pTable); + return -1; + } + + bidx++; + if (bidx < nBlocks) { + pBlock = pCommith->readh.pBlkInfo->blocks + bidx; + } else { + pBlock = NULL; + } + nextKey = tsdbNextIterKey(pIter->pIter); + } else { + // Only commit memory data + if (pBlock == NULL) { + if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) { + TSDB_RUNLOCK_TABLE(pIter->pTable); + return -1; + } + } else { + if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) { + TSDB_RUNLOCK_TABLE(pIter->pTable); + return -1; + } + } + nextKey = tsdbNextIterKey(pIter->pIter); + } + } + } + +#if 0 if (!pCommith->isRFileSet) { if (pIter->pIter == NULL) { // No memory data @@ -607,7 +658,7 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { } nextKey = tsdbNextIterKey(pIter->pIter); } else { - if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst-1, true) < 0) { + if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) { TSDB_RUNLOCK_TABLE(pIter->pTable); return -1; } @@ -615,6 +666,7 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { } } } +#endif TSDB_RUNLOCK_TABLE(pIter->pTable); @@ -635,6 +687,8 @@ static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) { if (tsdbSetReadTable(&(pCommith->readh), pTable) < 0) { return -1; } + } else { + pCommith->readh.pBlkIdx = NULL; } return 0; } @@ -973,7 +1027,7 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { // Ignore the block ASSERT(0); *(pIter->pIter) = titer; - } else if (tsdbCanAddSubBlock()) { + } else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) { // Add a sub-block tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols, pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update, @@ -1011,12 +1065,12 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { } static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { - SBlock *pBlock = pCommith->readh.pBlkInfo->blocks+bidx; + SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; SDFile *pCommitF = (pBlock->last) ? TSDB_COMMIT_LAST_FILE(pCommith) : TSDB_COMMIT_DATA_FILE(pCommith); SDFile *pReadF = (pBlock->last) ? TSDB_READ_LAST_FILE(&(pCommith->readh)) : TSDB_READ_DATA_FILE(&(pCommith->readh)); SBlock block; - if (tfsIsSameFile(&(pCommitF->f), &(pReadF->f))) { + if ((pBlock->last && pCommith->isLFileSame) || ((!pBlock->last) && pCommith->isDFileSame)) { if (pBlock->numOfSubBlocks == 1) { if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) return -1; } else { @@ -1158,7 +1212,9 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt } static void tsdbResetCommitFile(SCommitH *pCommith) { - tsdbResetCommitTable(pCommith); + pCommith->isRFileSet = false; + pCommith->isDFileSame = false; + pCommith->isLFileSame = false; taosArrayClear(pCommith->aBlkIdx); } @@ -1168,18 +1224,110 @@ static void tsdbResetCommitTable(SCommitH *pCommith) { taosArrayClear(pCommith->aSupBlk); } -static int tsdbOpenCommitFile(SCommitH *pCommith, SDFileSet *pRSet) { - if (pRSet == NULL) { - pCommith->isRFileSet = false; - } else { - pCommith->isRFileSet = true; - if (tsdbSetAndOpenReadFSet(&(pCommith->readh), pRSet) < 0) { +static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { + int level, id; + SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith); + + tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &level, &id); + if (level == TFS_UNDECIDED_LEVEL) { + terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; + return -1; + } + + // Open read FSET + if (pSet) { + if (tsdbSetAndOpenReadFSet(&(pCommith->readh), pSet) < 0) { return -1; } - } - if (tsdbOpenDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith), O_WRONLY | O_CREAT) < 0) { + pCommith->isRFileSet = true; + + if (tsdbLoadBlockIdx(&(pCommith->readh)) < 0) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + } return -1; + } else { + pCommith->isRFileSet = false; + } + + // Set and open commit FSET + if (pSet == NULL || level > TSDB_FSET_LEVEL(pSet)) { + // Create new FSET + tsdbInitDFileSet(pWSet, TSDB_COMMIT_REPO_ID(pCommith), fid, pCommith->version, level, id); + + if (tsdbOpenDFileSet(pWSet, O_WRONLY | O_CREAT) < 0) { + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + remove(TSDB_FILE_FULL_NAME(pWSet, ftype)); + } + + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + } + return -1; + } + + if (tsdbUpdateDFileSetHeader(pWSet) < 0) { + tsdbCloseDFileSet(pWSet); + + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + remove(TSDB_FILE_FULL_NAME(pWSet, ftype)); + } + + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + } + + return -1; + } + + // TODO: update file info; + } else { + level = TSDB_FSET_LEVEL(pSet); + id = TSDB_FSET_ID(pSet); + + // TSDB_FILE_HEAD + SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith); + tsdbInitDFile(pWHeadf, REPO_ID(pRepo), fid, pCommith->version, level, id, NULL, TSDB_FILE_HEAD); + if (tsdbCreateAndOpenDFile(pWHeadf) < 0) { + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } + + // TSDB_FILE_DATA + SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh)); + SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith); + tsdbInitDFileWithOld(pWHeadf, pRDataf); + if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) { + tsdbCloseDFile(pWHeadf); + remove(TSDB_FILE_FULL_NAME(pWHeadf)); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } + pCommith->isDFileSame = true; + + // TSDB_FILE_LAST + SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh)); + SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith); + if (pRLastf->info.size < 32 * 1024) { + tsdbInitDFileWithOld(pWLastf, pRLastf); + pCommith->isLFileSame = true; + } else { + tsdbInitDFile(pWLastf, REPO_ID(pRepo), fid, pCommith->version, level, id, NULL, TSDB_FILE_LAST); + pCommith->isLFileSame = false; + } + if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) { + tsdbCloseDFile(pWDataf); + tsdbCloseDFile(pWHeadf); + remove(TSDB_FILE_FULL_NAME(pWHeadf)); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } } return 0; @@ -1197,4 +1345,22 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) { } } tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); +} + +static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) { + STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg * pCfg = REPO_CFG(pRepo); + int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed; + + ASSERT(mergeRows > 0); + + if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRowsPerFileBlock) { + if (pBlock->last) { + if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true; + } else { + if (mergeRows < pCfg->maxRowsPerFileBlock) return true; + } + } + + return false; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 73e12493fd5c8a665cd98c7ed1c127b12081b550..5a9557ff8f2f328c85188094a25d611860488eba 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -82,7 +82,8 @@ static void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) { } // ============== Operations on SDFile -void tsdbInitDFile(SDFile *pDFile, int vid, int fid, int ver, int level, int id, const SDFInfo *pInfo, TSDB_FILE_T ftype) { +void tsdbInitDFile(SDFile *pDFile, int vid, int fid, uint32_t ver, int level, int id, const SDFInfo *pInfo, + TSDB_FILE_T ftype) { char fname[TSDB_FILENAME_LEN]; TSDB_FILE_SET_CLOSED(pDFile); @@ -158,7 +159,7 @@ static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) { } // ============== Operations on SDFileSet -void tsdbInitDFileSet(SDFileSet *pSet, int vid, int fid, int ver, int level, int id) { +void tsdbInitDFileSet(SDFileSet *pSet, int vid, int fid, uint32_t ver, int level, int id) { pSet->fid = fid; pSet->state = 0; @@ -201,7 +202,7 @@ int tsdbCopyDFileSet(SDFileSet src, int tolevel, int toid, SDFileSet *pDest) { ASSERT(tolevel > TSDB_FSET_LEVEL(&src)); for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { - if (tsdbCopyDFile(TSDB_DFILE_IN_SET(&src, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) { + if (tsdbCopyDFile(TSDB_DFILE_IN_SET(&src, ftype), tolevel, toid, TSDB_DFILE_IN_SET(pDest, ftype)) < 0) { while (ftype >= 0) { remove(TSDB_FILE_FULL_NAME(TSDB_DFILE_IN_SET(pDest, ftype))); ftype--; @@ -214,20 +215,20 @@ int tsdbCopyDFileSet(SDFileSet src, int tolevel, int toid, SDFileSet *pDest) { return 0; } -static void tsdbGetFilename(int vid, int fid, int64_t ver, TSDB_FILE_T ftype, char *fname) { +static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname) { ASSERT(ftype != TSDB_FILE_MAX); if (ftype < TSDB_FILE_MAX) { if (ver == 0) { snprintf(fname, "vnode/vnode%d/tsdb/data/v%df%d.%s", vid, vid, fid, TSDB_FNAME_SUFFIX[ftype]); } else { - snprintf(fname, "vnode/vnode%d/tsdb/data/v%df%d.%s-%012" PRId64, vid, vid, fid, TSDB_FNAME_SUFFIX[ftype], ver); + snprintf(fname, "vnode/vnode%d/tsdb/data/v%df%d.%s-%012" PRIu32, vid, vid, fid, TSDB_FNAME_SUFFIX[ftype], ver); } } else { if (ver == 0) { snprintf(fname, "vnode/vnode%d/tsdb/%s", vid, TSDB_FNAME_SUFFIX[ftype]); } else { - snprintf(fname, "vnode/vnode%d/tsdb/%s-%012" PRId64, vid, TSDB_FNAME_SUFFIX[ftype], ver); + snprintf(fname, "vnode/vnode%d/tsdb/%s-%012" PRIu32, vid, TSDB_FNAME_SUFFIX[ftype], ver); } } } \ No newline at end of file