/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "tsdb.h" #define TSDB_MAX_SUBBLOCKS 8 typedef struct { STable *pTable; SSkipListIterator *pIter; } SCommitIter; typedef struct { SRtn rtn; // retention snapshot SFSIter fsIter; // tsdb file iterator int niters; // memory iterators SCommitIter *iters; bool isRFileSet; // read and commit FSET SReadH readh; SDFileSet wSet; bool isDFileSame; bool isLFileSame; TSKEY minKey; TSKEY maxKey; SArray *aBlkIdx; // SBlockIdx array STable *pTable; SArray *aSupBlk; // Table super-block array SArray *aSubBlk; // table sub-block array SDataCols *pDataCols; } SCommitH; #define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) #define TSDB_COMMIT_REPO(ch) TSDB_READ_REPO(&(ch->readh)) #define TSDB_COMMIT_REPO_ID(ch) REPO_ID(TSDB_READ_REPO(&(ch->readh))) #define TSDB_COMMIT_WRITE_FSET(ch) (&((ch)->wSet)) #define TSDB_COMMIT_TABLE(ch) ((ch)->pTable) #define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD) #define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA) #define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST) #define TSDB_COMMIT_SMAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAD) #define TSDB_COMMIT_SMAL_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAL) #define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh)) #define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh)) #define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh)) #define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->pVnode->config.tsdbCfg.maxRows) #define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) static void tsdbStartCommit(STsdb *pRepo); static void tsdbEndCommit(STsdb *pTsdb, int eno); static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo); static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key); static int tsdbNextCommitFid(SCommitH *pCommith); static void tsdbDestroyCommitH(SCommitH *pCommith); static int tsdbCreateCommitIters(SCommitH *pCommith); static void tsdbDestroyCommitIters(SCommitH *pCommith); static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static void tsdbResetCommitFile(SCommitH *pCommith); static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static int tsdbCommitToTable(SCommitH *pCommith, int tid); static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx); static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); static int tsdbComparKeyBlock(const void *arg1, const void *arg2); static int tsdbWriteBlockInfo(SCommitH *pCommih); static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx); static int tsdbMoveBlock(SCommitH *pCommith, int bidx); static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks); static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool isLastOneBlock); static void tsdbResetCommitTable(SCommitH *pCommith); static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { SDiskID did; SDFileSet nSet = {0}; STsdbFS *pfs = REPO_FS(pRepo); int level; ASSERT(pSet->fid >= pRtn->minFid); level = tsdbGetFidLevel(pSet->fid, pRtn); if (tfsAllocDisk(pRepo->pVnode->pTfs, level, &did) < 0) { terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; return -1; } if (did.level > TSDB_FSET_LEVEL(pSet)) { // Need to move the FSET to higher level tsdbInitDFileSet(pRepo, &nSet, did, pSet->fid, FS_TXN_VERSION(pfs)); if (tsdbCopyDFileSet(pSet, &nSet) < 0) { tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno)); return -1; } if (tsdbUpdateDFileSet(pfs, &nSet) < 0) { return -1; } tsdbInfo("vgId:%d FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id); } else { // On a correct level if (tsdbUpdateDFileSet(pfs, pSet) < 0) { return -1; } } return 0; } int tsdbPrepareCommit(STsdb *pTsdb) { if (pTsdb->mem == NULL) return 0; ASSERT(pTsdb->imem == NULL); pTsdb->imem = pTsdb->mem; pTsdb->mem = NULL; return 0; } int tsdbCommit(STsdb *pRepo) { SCommitH commith = {0}; SDFileSet *pSet = NULL; int fid; // if (pRepo->imem == NULL) return 0; pRepo->imem = pRepo->mem; pRepo->mem = NULL; tsdbStartCommit(pRepo); // Resource initialization if (tsdbInitCommitH(&commith, pRepo) < 0) { return -1; } // Skip expired memory data and expired FSET tsdbSeekCommitIter(&commith, commith.rtn.minKey); while ((pSet = tsdbFSIterNext(&(commith.fsIter)))) { if (pSet->fid < commith.rtn.minFid) { tsdbInfo("vgId:%d FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); } else { break; } } // Loop to commit to each file fid = tsdbNextCommitFid(&(commith)); while (true) { // Loop over both on disk and memory if (pSet == NULL && fid == TSDB_IVLD_FID) break; if (pSet && (fid == TSDB_IVLD_FID || pSet->fid < fid)) { // Only has existing FSET but no memory data to commit in this // existing FSET, only check if file in correct retention if (tsdbApplyRtnOnFSet(pRepo, pSet, &(commith.rtn)) < 0) { tsdbDestroyCommitH(&commith); return -1; } pSet = tsdbFSIterNext(&(commith.fsIter)); } else { // Has memory data to commit SDFileSet *pCSet; int cfid; if (pSet == NULL || pSet->fid > fid) { // Commit to a new FSET with fid: fid pCSet = NULL; cfid = fid; } else { // Commit to an existing FSET pCSet = pSet; cfid = pSet->fid; pSet = tsdbFSIterNext(&(commith.fsIter)); } if (tsdbCommitToFile(&commith, pCSet, cfid) < 0) { tsdbDestroyCommitH(&commith); return -1; } fid = tsdbNextCommitFid(&commith); } } tsdbDestroyCommitH(&commith); tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS); return 0; } void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) { STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); TSKEY minKey, midKey, maxKey, now; now = taosGetTimestamp(pCfg->precision); minKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision]; midKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision]; maxKey = now - pCfg->keep0 * tsTickPerDay[pCfg->precision]; pRtn->minKey = minKey; pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->days, pCfg->precision)); pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->days, pCfg->precision)); pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->days, pCfg->precision)); tsdbDebug("vgId:%d now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey, pRtn->minFid, pRtn->midFid, pRtn->maxFid); } static void tsdbStartCommit(STsdb *pRepo) { STsdbMemTable *pMem = pRepo->imem; tsdbInfo("vgId:%d start to commit", REPO_ID(pRepo)); tsdbStartFSTxn(pRepo, 0, 0); } static void tsdbEndCommit(STsdb *pTsdb, int eno) { tsdbEndFSTxn(pTsdb); tsdbMemTableDestroy(pTsdb, pTsdb->imem); pTsdb->imem = NULL; tsdbInfo("vgId:%d commit over, %s", REPO_ID(pTsdb), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed"); } static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo) { STsdbCfg *pCfg = REPO_CFG(pRepo); memset(pCommith, 0, sizeof(*pCommith)); tsdbGetRtnSnap(pRepo, &(pCommith->rtn)); TSDB_FSET_SET_CLOSED(TSDB_COMMIT_WRITE_FSET(pCommith)); // Init read handle if (tsdbInitReadH(&(pCommith->readh), pRepo) < 0) { return -1; } // Init file iterator tsdbFSIterInit(&(pCommith->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD); if (tsdbCreateCommitIters(pCommith) < 0) { tsdbDestroyCommitH(pCommith); return -1; } pCommith->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx)); if (pCommith->aBlkIdx == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbDestroyCommitH(pCommith); return -1; } pCommith->aSupBlk = taosArrayInit(1024, sizeof(SBlock)); if (pCommith->aSupBlk == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbDestroyCommitH(pCommith); return -1; } pCommith->aSubBlk = taosArrayInit(1024, sizeof(SBlock)); if (pCommith->aSubBlk == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbDestroyCommitH(pCommith); return -1; } pCommith->pDataCols = tdNewDataCols(0, pCfg->maxRows); if (pCommith->pDataCols == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbDestroyCommitH(pCommith); return -1; } return 0; } // Skip all keys until key (not included) static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) { for (int i = 0; i < pCommith->niters; i++) { SCommitIter *pIter = pCommith->iters + i; if (pIter->pTable == NULL || pIter->pIter == NULL) continue; tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key - 1, INT32_MAX, NULL, NULL, 0, true, NULL); } } static int tsdbNextCommitFid(SCommitH *pCommith) { STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); int fid = TSDB_IVLD_FID; for (int i = 0; i < pCommith->niters; i++) { SCommitIter *pIter = pCommith->iters + i; // if (pIter->pTable == NULL || pIter->pIter == NULL) continue; TSKEY nextKey = tsdbNextIterKey(pIter->pIter); if (nextKey == TSDB_DATA_TIMESTAMP_NULL) { continue; } else { int tfid = (int)(TSDB_KEY_FID(nextKey, pCfg->days, pCfg->precision)); if (fid == TSDB_IVLD_FID || fid > tfid) { fid = tfid; } } } return fid; } static void tsdbDestroyCommitH(SCommitH *pCommith) { pCommith->pDataCols = tdFreeDataCols(pCommith->pDataCols); pCommith->aSubBlk = taosArrayDestroy(pCommith->aSubBlk); pCommith->aSupBlk = taosArrayDestroy(pCommith->aSupBlk); pCommith->aBlkIdx = taosArrayDestroy(pCommith->aBlkIdx); tsdbDestroyCommitIters(pCommith); tsdbDestroyReadH(&(pCommith->readh)); tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); } static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); ASSERT(pSet == NULL || pSet->fid == fid); tsdbResetCommitFile(pCommith); tsdbGetFidKeyRange(pCfg->days, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey)); // Set and open files if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) { return -1; } #if 0 // Loop to commit each table data for (int tid = 0; tid < pCommith->niters; tid++) { SCommitIter *pIter = pCommith->iters + tid; if (pIter->pTable == NULL) continue; if (tsdbCommitToTable(pCommith, tid) < 0) { tsdbCloseCommitFile(pCommith, true); // revert the file change tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); return -1; } } #endif // Loop to commit each table data in mem and file int mIter = 0, fIter = 0; int nBlkIdx = taosArrayGetSize(pCommith->readh.aBlkIdx); while (true) { SBlockIdx *pIdx = NULL; SCommitIter *pIter = NULL; if (mIter < pCommith->niters) { pIter = pCommith->iters + mIter; if (fIter < nBlkIdx) { pIdx = taosArrayGet(pCommith->readh.aBlkIdx, fIter); } } else if (fIter < nBlkIdx) { pIdx = taosArrayGet(pCommith->readh.aBlkIdx, fIter); } else { break; } if (pIter && pIter->pTable && (!pIdx || (pIter->pTable->uid <= pIdx->uid))) { if (tsdbCommitToTable(pCommith, mIter) < 0) { tsdbCloseCommitFile(pCommith, true); // revert the file change tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); return -1; } if (pIdx && (pIter->pTable->uid == pIdx->uid)) { ++fIter; } ++mIter; } else if (pIter && !pIter->pTable) { // When table already dropped during commit, pIter is not NULL but pIter->pTable is NULL. ++mIter; // skip the table and do nothing } else if (pIdx) { if (tsdbMoveBlkIdx(pCommith, pIdx) < 0) { tsdbCloseCommitFile(pCommith, true); // revert the file change tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); return -1; } ++fIter; } } if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) < 0) { tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); tsdbCloseCommitFile(pCommith, true); // revert the file change tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); return -1; } if (tsdbUpdateDFileSetHeader(&(pCommith->wSet)) < 0) { tsdbError("vgId:%d failed to update FSET %d header since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); tsdbCloseCommitFile(pCommith, true); // revert the file change tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); return -1; } // Close commit file tsdbCloseCommitFile(pCommith, false); if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) { return -1; } return 0; } static int tsdbCreateCommitIters(SCommitH *pCommith) { STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbMemTable *pMem = pRepo->imem; SSkipListIterator *pSlIter; SCommitIter *pCommitIter; SSkipListNode *pNode; STbData *pTbData; STSchema *pTSchema = NULL; pCommith->niters = SL_SIZE(pMem->pSlIdx); pCommith->iters = (SCommitIter *)taosMemoryCalloc(pCommith->niters, sizeof(SCommitIter)); if (pCommith->iters == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } // Loop to create iters for each skiplist pSlIter = tSkipListCreateIter(pMem->pSlIdx); if (pSlIter == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } for (int i = 0; i < pCommith->niters; i++) { tSkipListIterNext(pSlIter); pNode = tSkipListIterGet(pSlIter); pTbData = (STbData *)pNode->pData; pCommitIter = pCommith->iters + i; pTSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 0); // TODO: schema version if (pTSchema) { pCommitIter->pIter = tSkipListCreateIter(pTbData->pData); tSkipListIterNext(pCommitIter->pIter); pCommitIter->pTable = (STable *)taosMemoryMalloc(sizeof(STable)); pCommitIter->pTable->uid = pTbData->uid; pCommitIter->pTable->tid = pTbData->uid; pCommitIter->pTable->pSchema = pTSchema; // metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 0); } } return 0; } static void tsdbDestroyCommitIters(SCommitH *pCommith) { if (pCommith->iters == NULL) return; for (int i = 1; i < pCommith->niters; i++) { tSkipListDestroyIter(pCommith->iters[i].pIter); tdFreeSchema(pCommith->iters[i].pTable->pSchema); taosMemoryFree(pCommith->iters[i].pTable); } taosMemoryFree(pCommith->iters); pCommith->iters = NULL; pCommith->niters = 0; } static void tsdbResetCommitFile(SCommitH *pCommith) { pCommith->isRFileSet = false; pCommith->isDFileSame = false; pCommith->isLFileSame = false; taosArrayClear(pCommith->aBlkIdx); } static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { SDiskID did; STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith); if (tfsAllocDisk(REPO_TFS(pRepo), tsdbGetFidLevel(fid, &(pCommith->rtn)), &did) < 0) { terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; return -1; } // Open read FSET if (pSet) { if (tsdbSetAndOpenReadFSet(&(pCommith->readh), pSet) < 0) { return -1; } pCommith->isRFileSet = true; if (tsdbLoadBlockIdx(&(pCommith->readh)) < 0) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); return -1; } tsdbDebug("vgId:%d FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo), TSDB_FSET_FID(pSet), TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); } else { pCommith->isRFileSet = false; } // Set and open commit FSET if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) { // Create a new FSET to write data tsdbInitDFileSet(pRepo, pWSet, did, fid, FS_TXN_VERSION(REPO_FS(pRepo))); if (tsdbCreateDFileSet(pRepo, pWSet, true) < 0) { tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo), TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet), tstrerror(terrno)); if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); } return -1; } pCommith->isDFileSame = false; pCommith->isLFileSame = false; tsdbDebug("vgId:%d FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet)); } else { did.level = TSDB_FSET_LEVEL(pSet); did.id = TSDB_FSET_ID(pSet); pCommith->wSet.fid = fid; pCommith->wSet.state = 0; // TSDB_FILE_HEAD SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith); tsdbInitDFile(pRepo, pWHeadf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD); if (tsdbCreateDFile(pRepo, pWHeadf, true, TSDB_FILE_HEAD) < 0) { tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf), tstrerror(terrno)); if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); return -1; } } // TSDB_FILE_DATA SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh)); SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith); tsdbInitDFileEx(pWDataf, pRDataf); // if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) { if (tsdbOpenDFile(pWDataf, TD_FILE_WRITE) < 0) { tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWDataf), tstrerror(terrno)); tsdbCloseDFileSet(pWSet); tsdbRemoveDFile(pWHeadf); if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); return -1; } } pCommith->isDFileSame = true; // TSDB_FILE_LAST SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh)); SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith); if (pRLastf->info.size < 32 * 1024) { tsdbInitDFileEx(pWLastf, pRLastf); pCommith->isLFileSame = true; // if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) { if (tsdbOpenDFile(pWLastf, TD_FILE_WRITE) < 0) { tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf), tstrerror(terrno)); tsdbCloseDFileSet(pWSet); tsdbRemoveDFile(pWHeadf); if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); return -1; } } } else { tsdbInitDFile(pRepo, pWLastf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST); pCommith->isLFileSame = false; if (tsdbCreateDFile(pRepo, pWLastf, true, TSDB_FILE_LAST) < 0) { tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf), tstrerror(terrno)); tsdbCloseDFileSet(pWSet); (void)tsdbRemoveDFile(pWHeadf); if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); return -1; } } } // TSDB_FILE_SMAD SDFile *pRSmadF = TSDB_READ_SMAD_FILE(&(pCommith->readh)); SDFile *pWSmadF = TSDB_COMMIT_SMAD_FILE(pCommith); if (!taosCheckExistFile(TSDB_FILE_FULL_NAME(pRSmadF))) { tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmadF)); tsdbInitDFile(pRepo, pWSmadF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAD); if (tsdbCreateDFile(pRepo, pWSmadF, true, TSDB_FILE_SMAD) < 0) { tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF), tstrerror(terrno)); tsdbCloseDFileSet(pWSet); (void)tsdbRemoveDFile(pWHeadf); if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); return -1; } } } else { tsdbInitDFileEx(pWSmadF, pRSmadF); if (tsdbOpenDFile(pWSmadF, O_RDWR) < 0) { tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF), tstrerror(terrno)); tsdbCloseDFileSet(pWSet); tsdbRemoveDFile(pWHeadf); if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); return -1; } } } // TSDB_FILE_SMAL SDFile *pRSmalF = TSDB_READ_SMAL_FILE(&(pCommith->readh)); SDFile *pWSmalF = TSDB_COMMIT_SMAL_FILE(pCommith); if ((pCommith->isLFileSame) && taosCheckExistFile(TSDB_FILE_FULL_NAME(pRSmalF))) { tsdbInitDFileEx(pWSmalF, pRSmalF); if (tsdbOpenDFile(pWSmalF, O_RDWR) < 0) { tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF), tstrerror(terrno)); tsdbCloseDFileSet(pWSet); tsdbRemoveDFile(pWHeadf); if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); return -1; } } } else { tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmalF)); tsdbInitDFile(pRepo, pWSmalF, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAL); if (tsdbCreateDFile(pRepo, pWSmalF, true, TSDB_FILE_SMAL) < 0) { tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF), tstrerror(terrno)); tsdbCloseDFileSet(pWSet); (void)tsdbRemoveDFile(pWHeadf); if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); return -1; } } } } return 0; } // extern int32_t tsTsdbMetaCompactRatio; int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx) { size_t nSupBlocks; size_t nSubBlocks; uint32_t tlen; SBlockInfo *pBlkInfo; int64_t offset; SBlock *pBlock; memset(pIdx, 0, sizeof(*pIdx)); nSupBlocks = taosArrayGetSize(pSupA); nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA); if (nSupBlocks <= 0) { // No data (data all deleted) return 0; } tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM)); if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; pBlkInfo = *ppBuf; pBlkInfo->delimiter = TSDB_FILE_DELIMITER; pBlkInfo->tid = TABLE_TID(pTable); pBlkInfo->uid = TABLE_UID(pTable); memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock)); if (nSubBlocks > 0) { memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock)); for (int i = 0; i < nSupBlocks; i++) { pBlock = pBlkInfo->blocks + i; if (pBlock->numOfSubBlocks > 1) { pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks); } } } taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen); if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) { return -1; } tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM))); // Set pIdx pBlock = taosArrayGetLast(pSupA); pIdx->uid = TABLE_UID(pTable); pIdx->hasLast = pBlock->last ? 1 : 0; pIdx->maxKey = pBlock->keyLast; pIdx->numOfBlocks = (uint32_t)nSupBlocks; pIdx->len = tlen; pIdx->offset = (uint32_t)offset; return 0; } int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { SBlockIdx *pBlkIdx; size_t nidx = taosArrayGetSize(pIdxA); int tlen = 0, size; int64_t offset; if (nidx <= 0) { // All data are deleted pHeadf->info.offset = 0; pHeadf->info.len = 0; return 0; } for (size_t i = 0; i < nidx; i++) { pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i); size = tsdbEncodeSBlockIdx(NULL, pBlkIdx); if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1; void *ptr = POINTER_SHIFT(*ppBuf, tlen); tsdbEncodeSBlockIdx(&ptr, pBlkIdx); tlen += size; } tlen += sizeof(TSCKSUM); if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen); if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) { return -1; } tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM))); pHeadf->info.offset = (uint32_t)offset; pHeadf->info.len = tlen; return 0; } // =================== Commit Time-Series Data static int tsdbCommitToTable(SCommitH *pCommith, int tid) { SCommitIter *pIter = pCommith->iters + tid; TSKEY nextKey = tsdbNextIterKey(pIter->pIter); tsdbResetCommitTable(pCommith); // Set commit table if (tsdbSetCommitTable(pCommith, pIter->pTable) < 0) { return -1; } // No disk data and no memory data, just return if (pCommith->readh.pBlkIdx == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) { return 0; } // Must has disk data or has memory data int nBlocks; int bidx = 0; SBlock *pBlock; if (pCommith->readh.pBlkIdx) { if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) { return -1; } nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; } else { nBlocks = 0; } if (bidx < nBlocks) { pBlock = pCommith->readh.pBlkInfo->blocks + bidx; } else { pBlock = NULL; } while (true) { if (pBlock == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) break; if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) || (pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) { if (tsdbMoveBlock(pCommith, bidx) < 0) { return -1; } bidx++; if (bidx < nBlocks) { pBlock = pCommith->readh.pBlkInfo->blocks + bidx; } else { pBlock = NULL; } } else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) { // merge pBlock data and memory data if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) { return -1; } bidx++; if (bidx < nBlocks) { pBlock = pCommith->readh.pBlkInfo->blocks + bidx; } else { pBlock = NULL; } nextKey = tsdbNextIterKey(pIter->pIter); } else { // Only commit memory data if (pBlock == NULL) { if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) { return -1; } } else { if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) { return -1; } } nextKey = tsdbNextIterKey(pIter->pIter); } } if (tsdbWriteBlockInfo(pCommith) < 0) { tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); return -1; } return 0; } static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { SReadH *pReadh = &pCommith->readh; int nBlocks = pIdx->numOfBlocks; int bidx = 0; tsdbResetCommitTable(pCommith); pReadh->pBlkIdx = pIdx; if (tsdbLoadBlockInfo(pReadh, NULL) < 0) { return -1; } while (bidx < nBlocks) { if (tsdbMoveBlock(pCommith, bidx) < 0) { tsdbError("vgId:%d failed to move block into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); return -1; } ++bidx; } STable table = {.tid = pIdx->uid, .uid = pIdx->uid, .pSchema = NULL}; TSDB_COMMIT_TABLE(pCommith) = &table; if (tsdbWriteBlockInfo(pCommith) < 0) { tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); return -1; } return 0; } static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) { STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); pCommith->pTable = pTable; if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } if (pCommith->isRFileSet) { if (tsdbSetReadTable(&(pCommith->readh), pTable) < 0) { return -1; } } else { pCommith->readh.pBlkIdx = NULL; } return 0; } static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { TSKEY key = *(TSKEY *)arg1; SBlock *pBlock = (SBlock *)arg2; if (key < pBlock->keyFirst) { return -1; } else if (key > pBlock->keyLast) { return 1; } else { return 0; } } /** * @brief Write SDataCols to data file. * * @param pRepo * @param pTable * @param pDFile * @param pDFileAggr * @param pDataCols The pDataCols would be generated from mem/imem directly with 2 bits bitmap or from tsdbRead * interface with 1 bit bitmap. * @param pBlock * @param isLast * @param isSuper * @param ppBuf * @param ppCBuf * @param ppExBuf * @return int */ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols, SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) { STsdbCfg *pCfg = REPO_CFG(pRepo); SBlockData *pBlockData = NULL; SAggrBlkData *pAggrBlkData = NULL; STSchema *pSchema = pTable->pSchema; int64_t offset = 0, offsetAggr = 0; int rowsToWrite = pDataCols->numOfRows; ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRows); ASSERT((!isLast) || rowsToWrite < pCfg->minRows); // Make buffer space if (tsdbMakeRoom(ppBuf, tsdbBlockStatisSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) { return -1; } pBlockData = (SBlockData *)(*ppBuf); if (tsdbMakeRoom(ppExBuf, tsdbBlockAggrSize(pDataCols->numOfCols, SBlockVerLatest)) < 0) { return -1; } pAggrBlkData = (SAggrBlkData *)(*ppExBuf); // Get # of cols not all NULL(not including key column) col_id_t nColsNotAllNull = 0; col_id_t nColsOfBlockSma = 0; for (int ncol = 1; ncol < pDataCols->numOfCols; ++ncol) { // ncol from 1, we skip the timestamp column STColumn *pColumn = pSchema->columns + ncol; SDataCol *pDataCol = pDataCols->cols + ncol; SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull; SAggrBlkCol *pAggrBlkCol = (SAggrBlkCol *)pAggrBlkData + nColsOfBlockSma; if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it continue; } memset(pBlockCol, 0, sizeof(*pBlockCol)); memset(pAggrBlkCol, 0, sizeof(*pAggrBlkCol)); pBlockCol->colId = pDataCol->colId; pBlockCol->type = pDataCol->type; pAggrBlkCol->colId = pDataCol->colId; if (isSuper && IS_BSMA_ON(pColumn) && tDataTypes[pDataCol->type].statisFunc) { #if 0 (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), &(pBlockCol->numOfNull)); #endif (*tDataTypes[pDataCol->type].statisFunc)(pDataCols->bitmapMode, pDataCol->pBitmap, pDataCol->pData, rowsToWrite, &(pAggrBlkCol->min), &(pAggrBlkCol->max), &(pAggrBlkCol->sum), &(pAggrBlkCol->minIndex), &(pAggrBlkCol->maxIndex), &(pAggrBlkCol->numOfNull)); if (pAggrBlkCol->numOfNull == 0) { pBlockCol->blen = 0; } else { pBlockCol->blen = 1; } ++nColsOfBlockSma; } else if (tdIsBitmapBlkNorm(pDataCol->pBitmap, rowsToWrite, pDataCols->bitmapMode)) { // check if all rows normal pBlockCol->blen = 0; } else { pBlockCol->blen = 1; } ++nColsNotAllNull; } ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); // Compress the data if neccessary int tcol = 0; // counter of not all NULL and written columns uint32_t toffset = 0; int32_t tsize = (int32_t)tsdbBlockStatisSize(nColsNotAllNull, SBlockVerLatest); int32_t lsize = tsize; uint32_t tsizeAggr = (uint32_t)tsdbBlockAggrSize(nColsOfBlockSma, SBlockVerLatest); int32_t keyLen = 0; int32_t nBitmaps = (int32_t)TD_BITMAP_BYTES(rowsToWrite); int32_t sBitmaps = isSuper ? (int32_t)TD_BITMAP_BYTES_I(rowsToWrite) : nBitmaps; for (int ncol = 0; ncol < pDataCols->numOfCols; ++ncol) { // All not NULL columns finish if (ncol != 0 && tcol >= nColsNotAllNull) break; SDataCol *pDataCol = pDataCols->cols + ncol; SBlockCol *pBlockCol = pBlockData->cols + tcol; if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue; int32_t flen; // final length int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite, pDataCols->bitmapMode); #ifdef TD_SUPPORT_BITMAP int32_t tBitmaps = 0; int32_t tBitmapsLen = 0; if ((ncol != 0) && (pBlockCol->blen > 0)) { tBitmaps = isSuper ? sBitmaps : nBitmaps; } #endif void *tptr, *bptr; // Make room if (tsdbMakeRoom(ppBuf, lsize + tlen + tBitmaps + 2 * COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) { return -1; } pBlockData = (SBlockData *)(*ppBuf); pBlockCol = pBlockData->cols + tcol; tptr = POINTER_SHIFT(pBlockData, lsize); if (pCfg->compression == TWO_STAGE_COMP && tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) { return -1; } // Compress or just copy if (pCfg->compression) { #if 0 flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite + tBitmaps, tptr, tlen + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf, tlen + COMP_OVERFLOW_BYTES); #endif flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr, tlen + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf, tlen + COMP_OVERFLOW_BYTES); if (tBitmaps > 0) { bptr = POINTER_SHIFT(pBlockData, lsize + flen); if (isSuper && !tdDataColsIsBitmapI(pDataCols)) { tdMergeBitmap((uint8_t *)pDataCol->pBitmap, rowsToWrite, (uint8_t *)pDataCol->pBitmap); } tBitmapsLen = tsCompressTinyint((char *)pDataCol->pBitmap, tBitmaps, tBitmaps, bptr, tBitmaps + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf, tBitmaps + COMP_OVERFLOW_BYTES); TASSERT((tBitmapsLen > 0) && (tBitmapsLen <= (tBitmaps + COMP_OVERFLOW_BYTES))); flen += tBitmapsLen; } } else { flen = tlen; memcpy(tptr, pDataCol->pData, flen); if (tBitmaps > 0) { bptr = POINTER_SHIFT(pBlockData, lsize + flen); memcpy(bptr, pDataCol->pBitmap, tBitmaps); tBitmapsLen = tBitmaps; flen += tBitmapsLen; } } // Add checksum ASSERT(flen > 0); ASSERT(tBitmapsLen <= 1024); flen += sizeof(TSCKSUM); taosCalcChecksumAppend(0, (uint8_t *)tptr, flen); tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM))); if (ncol != 0) { tsdbSetBlockColOffset(pBlockCol, toffset); pBlockCol->len = flen; // data + bitmaps pBlockCol->blen = tBitmapsLen; ++tcol; } else { keyLen = flen; } toffset += flen; lsize += flen; } pBlockData->delimiter = TSDB_FILE_DELIMITER; pBlockData->uid = TABLE_UID(pTable); pBlockData->numOfCols = nColsNotAllNull; taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize); tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM))); // Write the whole block to file if (tsdbAppendDFile(pDFile, (void *)pBlockData, lsize, &offset) < lsize) { return -1; } uint32_t aggrStatus = nColsOfBlockSma > 0 ? 1 : 0; if (aggrStatus > 0) { taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr); tsdbUpdateDFileMagic(pDFileAggr, POINTER_SHIFT(pAggrBlkData, tsizeAggr - sizeof(TSCKSUM))); // Write the whole block to file if (tsdbAppendDFile(pDFileAggr, (void *)pAggrBlkData, tsizeAggr, &offsetAggr) < tsizeAggr) { return -1; } } // Update pBlock membership variables pBlock->last = isLast; pBlock->offset = offset; pBlock->algorithm = pCfg->compression; pBlock->numOfRows = rowsToWrite; pBlock->len = lsize; pBlock->keyLen = keyLen; pBlock->numOfSubBlocks = isSuper ? 1 : 0; pBlock->numOfCols = nColsNotAllNull; pBlock->numOfBSma = nColsOfBlockSma; pBlock->keyFirst = dataColsKeyFirst(pDataCols); pBlock->keyLast = dataColsKeyLast(pDataCols); pBlock->aggrStat = aggrStatus; pBlock->blkVer = SBlockVerLatest; pBlock->aggrOffset = (uint64_t)offsetAggr; tsdbDebug("vgId:%d uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64 " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, REPO_ID(pRepo), TABLE_UID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len, pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast); return 0; } static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, bool isSuper) { return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, isLast ? TSDB_COMMIT_SMAL_FILE(pCommith) : TSDB_COMMIT_SMAD_FILE(pCommith), pDataCols, pBlock, isLast, isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))), (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))), (void **)(&(TSDB_COMMIT_EXBUF(pCommith)))); } static int tsdbWriteBlockInfo(SCommitH *pCommih) { SDFile *pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); SBlockIdx blkIdx; STable *pTable = TSDB_COMMIT_TABLE(pCommih); if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))), &blkIdx) < 0) { return -1; } if (blkIdx.numOfBlocks == 0) { return 0; } if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } return 0; } static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) { STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbCfg *pCfg = REPO_CFG(pRepo); SMergeInfo mInfo; int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); SDFile *pDFile; bool isLast; SBlock block; while (true) { tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, defaultRows, pCommith->pDataCols, NULL, 0, pCfg->update, &mInfo); if (pCommith->pDataCols->numOfRows <= 0) break; if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRows) { pDFile = TSDB_COMMIT_DATA_FILE(pCommith); isLast = false; } else { pDFile = TSDB_COMMIT_LAST_FILE(pCommith); isLast = true; } if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) { return -1; } } return 0; } static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbCfg *pCfg = REPO_CFG(pRepo); int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; TSKEY keyLimit; int16_t colId = PRIMARYKEY_TIMESTAMP_COL_ID; SMergeInfo mInfo; SBlock subBlocks[TSDB_MAX_SUBBLOCKS]; SBlock block, supBlock; SDFile *pDFile; if (bidx == nBlocks - 1) { keyLimit = pCommith->maxKey; } else { keyLimit = pBlock[1].keyFirst - 1; } SSkipListIterator titer = *(pIter->pIter); if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1, false) < 0) return -1; tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo); if (mInfo.nOperations == 0) { // no new data to insert (all updates denied) if (tsdbMoveBlock(pCommith, bidx) < 0) { return -1; } *(pIter->pIter) = titer; } else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) { // Ignore the block ASSERT(0); *(pIter->pIter) = titer; } else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) { // Add a sub-block tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols, pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo); if (pBlock->last) { pDFile = TSDB_COMMIT_LAST_FILE(pCommith); } else { pDFile = TSDB_COMMIT_DATA_FILE(pCommith); } if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1; if (pBlock->numOfSubBlocks == 1) { subBlocks[0] = *pBlock; subBlocks[0].numOfSubBlocks = 0; } else { memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), sizeof(SBlock) * pBlock->numOfSubBlocks); } subBlocks[pBlock->numOfSubBlocks] = block; supBlock = *pBlock; supBlock.keyFirst = mInfo.keyFirst; supBlock.keyLast = mInfo.keyLast; supBlock.numOfSubBlocks++; supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed; supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock); if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1; } else { if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return -1; } return 0; } static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; SDFile *pDFile; SBlock block; bool isSameFile; ASSERT(pBlock->numOfSubBlocks > 0); if (pBlock->last) { pDFile = TSDB_COMMIT_LAST_FILE(pCommith); isSameFile = pCommith->isLFileSame; } else { pDFile = TSDB_COMMIT_DATA_FILE(pCommith); isSameFile = pCommith->isDFileSame; } if (isSameFile) { if (pBlock->numOfSubBlocks == 1) { if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) { return -1; } } else { block = *pBlock; block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSubBlk); if (tsdbCommitAddBlock(pCommith, &block, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), pBlock->numOfSubBlocks) < 0) { return -1; } } } else { if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; if (tsdbWriteBlock(pCommith, pDFile, pCommith->readh.pDCols[0], &block, pBlock->last, true) < 0) return -1; if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; } return 0; } static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) { if (taosArrayPush(pCommith->aSupBlk, pSupBlock) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } if (pSubBlocks && taosArrayAddBatch(pCommith->aSubBlk, pSubBlocks, nSubBlocks) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } return 0; } static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool isLastOneBlock) { STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbCfg *pCfg = REPO_CFG(pRepo); SBlock block; SDFile *pDFile; bool isLast; int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); int biter = 0; while (true) { tsdbLoadAndMergeFromCache(pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows, pCfg->update); if (pCommith->pDataCols->numOfRows == 0) break; if (isLastOneBlock) { if (pCommith->pDataCols->numOfRows < pCfg->minRows) { pDFile = TSDB_COMMIT_LAST_FILE(pCommith); isLast = true; } else { pDFile = TSDB_COMMIT_DATA_FILE(pCommith); isLast = false; } } else { pDFile = TSDB_COMMIT_DATA_FILE(pCommith); isLast = false; } if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; } return 0; } static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update) { TSKEY key1 = INT64_MAX; TSKEY key2 = INT64_MAX; TSKEY lastKey = TSKEY_INITIAL_VAL; STSchema *pSchema = NULL; ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey); tdResetDataCols(pTarget); pTarget->bitmapMode = pDataCols->bitmapMode; // TODO: filter Multi-Version // TODO: support delete function while (true) { key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); STSRow *row = tsdbNextIterRow(pCommitIter->pIter); if (row == NULL || TD_ROW_KEY(row) > maxKey) { key2 = INT64_MAX; } else { key2 = TD_ROW_KEY(row); } if (key1 == INT64_MAX && key2 == INT64_MAX) break; if (key1 < key2) { if (lastKey != TSKEY_INITIAL_VAL) { ++pTarget->numOfRows; } for (int i = 0; i < pDataCols->numOfCols; ++i) { // TODO: dataColAppendVal may fail SCellVal sVal = {0}; if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter, pDataCols->bitmapMode) < 0) { TASSERT(0); } tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints, pTarget->bitmapMode, false); } lastKey = key1; ++(*iter); } else if (key1 > key2) { if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) { pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, TD_ROW_SVER(row)); ASSERT(pSchema != NULL); } if (key2 == lastKey) { if (TD_SUPPORT_UPDATE(update)) { tdAppendSTSRowToDataCol(row, pSchema, pTarget, true); } } else { if (lastKey != TSKEY_INITIAL_VAL) { ++pTarget->numOfRows; } tdAppendSTSRowToDataCol(row, pSchema, pTarget, false); lastKey = key2; } tSkipListIterNext(pCommitIter->pIter); } else { if (lastKey != key1) { lastKey = key1; ++pTarget->numOfRows; } // copy disk data for (int i = 0; i < pDataCols->numOfCols; ++i) { SCellVal sVal = {0}; if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter, pDataCols->bitmapMode) < 0) { TASSERT(0); } // TODO: tdAppendValToDataCol may fail tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints, pTarget->bitmapMode, false); } if (TD_SUPPORT_UPDATE(update)) { // copy mem data(Multi-Version) if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) { pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, TD_ROW_SVER(row)); ASSERT(pSchema != NULL); } // TODO: merge with Multi-Version tdAppendSTSRowToDataCol(row, pSchema, pTarget, true); } ++(*iter); tSkipListIterNext(pCommitIter->pIter); } if (pTarget->numOfRows >= (maxRows - 1)) break; } if (lastKey != TSKEY_INITIAL_VAL) { ++pTarget->numOfRows; } } static void tsdbResetCommitTable(SCommitH *pCommith) { taosArrayClear(pCommith->aSubBlk); taosArrayClear(pCommith->aSupBlk); pCommith->pTable = NULL; } static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) { if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); } if (!hasError) { TSDB_FSET_FSYNC(TSDB_COMMIT_WRITE_FSET(pCommith)); } tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); } static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) { STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbCfg *pCfg = REPO_CFG(pRepo); int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed; ASSERT(mergeRows > 0); if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRows) { if (pBlock->last) { if (pCommith->isLFileSame && mergeRows < pCfg->minRows) return true; } else { if (pCommith->isDFileSame && mergeRows <= pCfg->maxRows) return true; } } return false; }