diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index d0e005f990739b75b8d77274b9a1f5111d63226b..5a9576f0c1d54bc26e896102180063db69638666 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -43,10 +43,8 @@ typedef struct STable { #define TABLE_TID(t) (t)->tid #define TABLE_UID(t) (t)->uid -STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, - STfs *pTfs); +STsdb *tsdbOpen(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF); void tsdbClose(STsdb *); -void tsdbRemove(const char *path); int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp); int tsdbPrepareCommit(STsdb *pTsdb); int tsdbCommit(STsdb *pTsdb); @@ -167,16 +165,14 @@ struct STsdb { SRtn rtn; SMemAllocatorFactory *pmaf; STsdbFS *fs; - SMeta *pMeta; - STfs *pTfs; SSmaEnvs smaEnvs; }; #define REPO_ID(r) ((r)->vgId) #define REPO_CFG(r) (&(r)->config) #define REPO_FS(r) ((r)->fs) -#define REPO_META(r) ((r)->pMeta) -#define REPO_TFS(r) ((r)->pTfs) +#define REPO_META(r) ((r)->pVnode->pMeta) +#define REPO_TFS(r) ((r)->pVnode->pTfs) #define IS_REPO_LOCKED(r) ((r)->repoLocked) #define REPO_TSMA_NUM(r) ((r)->smaEnvs.nTSma) #define REPO_RSMA_NUM(r) ((r)->smaEnvs.nRSma) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index a1edb7cd9c32148ccfda3ccef7597b4f20544304..25ef18cb362a6c80ccc4f9928281ee7631ecb464 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -100,7 +100,7 @@ int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { level = tsdbGetFidLevel(pSet->fid, pRtn); - if (tfsAllocDisk(pRepo->pTfs, level, &did) < 0) { + if (tfsAllocDisk(pRepo->pVnode->pTfs, level, &did) < 0) { terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; return -1; } @@ -427,7 +427,7 @@ static int tsdbCreateCommitIters(SCommitH *pCommith) { pCommitIter->pTable = (STable *)taosMemoryMalloc(sizeof(STable)); pCommitIter->pTable->uid = pTbData->uid; pCommitIter->pTable->tid = pTbData->uid; - pCommitIter->pTable->pSchema = metaGetTbTSchema(pRepo->pMeta, pTbData->uid, 0); + pCommitIter->pTable->pSchema = metaGetTbTSchema(REPO_META(pRepo), pTbData->uid, 0); } return 0; @@ -459,7 +459,7 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith); - if (tfsAllocDisk(pRepo->pTfs, tsdbGetFidLevel(fid, &(pCommith->rtn)), &did) < 0) { + if (tfsAllocDisk(REPO_TFS(pRepo), tsdbGetFidLevel(fid, &(pCommith->rtn)), &did) < 0) { terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; return -1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 866c02cbb316063b122ba87e2d85cbcbe8a39d46..0012a50ccfd4450ea96969afacc2d95c608a9cf9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -644,7 +644,7 @@ static int tsdbComparFidFSet(const void *arg1, const void *arg2) { } static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]) { - snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", tfsGetPrimaryPath(pRepo->pTfs), pRepo->vgId, + snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", tfsGetPrimaryPath(REPO_TFS(pRepo)), pRepo->vgId, tsdbTxnFname[ftype]); } @@ -912,7 +912,7 @@ static int tsdbScanRootDir(STsdb *pRepo) { const STfsFile *pf; tsdbGetRootDir(REPO_ID(pRepo), rootDir); - STfsDir *tdir = tfsOpendir(pRepo->pTfs, rootDir); + STfsDir *tdir = tfsOpendir(REPO_TFS(pRepo), rootDir); if (tdir == NULL) { tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), rootDir, tstrerror(terrno)); return -1; @@ -946,7 +946,7 @@ static int tsdbScanDataDir(STsdb *pRepo) { const STfsFile *pf; tsdbGetDataDir(REPO_ID(pRepo), dataDir); - STfsDir *tdir = tfsOpendir(pRepo->pTfs, dataDir); + STfsDir *tdir = tfsOpendir(REPO_TFS(pRepo), dataDir); if (tdir == NULL) { tsdbError("vgId:%d failed to open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno)); return -1; @@ -1128,7 +1128,7 @@ static int tsdbRestoreDFileSet(STsdb *pRepo) { return -1; } - tdir = tfsOpendir(pRepo->pTfs, dataDir); + tdir = tfsOpendir(REPO_TFS(pRepo), dataDir); if (tdir == NULL) { tsdbError("vgId:%d failed to restore DFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno)); diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index 8e5a6afc4dee400c9c33999bcd37e7ed41a9e1db..2fbe819476a3479a8b706e0388cb9d6da980971f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -311,7 +311,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t pDFile->info.fver = tsdbGetDFSVersion(ftype); tsdbGetFilename(pRepo->vgId, fid, ver, ftype, fname); - tfsInitFile(pRepo->pTfs, &(pDFile->f), did, fname); + tfsInitFile(REPO_TFS(pRepo), &(pDFile->f), did, fname); } void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile) { @@ -330,7 +330,7 @@ int tsdbEncodeSDFile(void **buf, SDFile *pDFile) { void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile) { buf = tsdbDecodeDFInfo(buf, &(pDFile->info)); - buf = tfsDecodeFile(pRepo->pTfs, buf, &(pDFile->f)); + buf = tfsDecodeFile(REPO_TFS(pRepo), buf, &(pDFile->f)); TSDB_FILE_SET_CLOSED(pDFile); return buf; @@ -365,7 +365,7 @@ int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T if (errno == ENOENT) { // Try to create directory recursively char *s = strdup(TSDB_FILE_REL_NAME(pDFile)); - if (tfsMkdirRecurAt(pRepo->pTfs, taosDirName(s), TSDB_FILE_DID(pDFile)) < 0) { + if (tfsMkdirRecurAt(REPO_TFS(pRepo), taosDirName(s), TSDB_FILE_DID(pDFile)) < 0) { taosMemoryFreeClear(s); return -1; } @@ -443,7 +443,7 @@ int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) { } static int tsdbScanAndTryFixDFile(STsdb *pRepo, SDFile *pDFile) { - SDFile df; + SDFile df; tsdbInitDFileEx(&df, pDFile); diff --git a/source/dnode/vnode/src/tsdb/tsdbMain.c b/source/dnode/vnode/src/tsdb/tsdbMain.c index 8543cde04689e4df8b0ad3646406ae889f4dc391..dd8723366dd7871363b9f870506961c9620ace17 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMain.c +++ b/source/dnode/vnode/src/tsdb/tsdbMain.c @@ -15,14 +15,12 @@ #include "vnodeInt.h" -static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, - SMeta *pMeta, STfs *pTfs); +static STsdb *tsdbNew(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF); static void tsdbFree(STsdb *pTsdb); static int tsdbOpenImpl(STsdb *pTsdb); static void tsdbCloseImpl(STsdb *pTsdb); -STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, - STfs *pTfs) { +STsdb *tsdbOpen(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF) { STsdb *pTsdb = NULL; // Set default TSDB Options @@ -37,7 +35,7 @@ STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAl } // Create the handle - pTsdb = tsdbNew(path, vgId, pTsdbCfg, pMAF, pMeta, pTfs); + pTsdb = tsdbNew(path, pVnode, pTsdbCfg, pMAF); if (pTsdb == NULL) { // TODO: handle error return NULL; @@ -61,11 +59,8 @@ void tsdbClose(STsdb *pTsdb) { } } -void tsdbRemove(const char *path) { taosRemoveDir(path); } - /* ------------------------ STATIC METHODS ------------------------ */ -static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, - SMeta *pMeta, STfs *pTfs) { +static STsdb *tsdbNew(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF) { STsdb *pTsdb = NULL; pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(STsdb)); @@ -75,11 +70,10 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, } pTsdb->path = strdup(path); - pTsdb->vgId = vgId; + pTsdb->vgId = TD_VID(pVnode); + pTsdb->pVnode = pVnode; tsdbOptionsCopy(&(pTsdb->config), pTsdbCfg); pTsdb->pmaf = pMAF; - pTsdb->pMeta = pMeta; - pTsdb->pTfs = pTfs; pTsdb->fs = tsdbNewFS(pTsdbCfg); return pTsdb; @@ -156,7 +150,7 @@ int tsdbUnlockRepo(STsdb *pTsdb) { #define IS_VALID_PRECISION(precision) \ (((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO)) -#define TSDB_DEFAULT_COMPRESSION TWO_STAGE_COMP +#define TSDB_DEFAULT_COMPRESSION TWO_STAGE_COMP #define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP)) static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 1314ef7d1eef95397125198e45fa4c6ba98478a0..f8eed6c3915bfb4e7a2de7923f693c80954d64a0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -986,7 +986,7 @@ static int32_t loadBlockInfo(STsdbReadHandle* pTsdbReadHandle, int32_t index, in pCheckInfo->numOfBlocks = 0; STable table = {.uid = pCheckInfo->tableId, .tid = pCheckInfo->tableId}; - table.pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0); + table.pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0); if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) { code = terrno; @@ -1091,7 +1091,7 @@ static int32_t doLoadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBl int32_t slotIndex) { int64_t st = taosGetTimestampUs(); - STSchema* pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0); + STSchema* pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0); int32_t code = tdInitDataCols(pTsdbReadHandle->pDataCols, pSchema); if (code != TSDB_CODE_SUCCESS) { tsdbError("%p failed to malloc buf for pDataCols, %s", pTsdbReadHandle, pTsdbReadHandle->idStr); @@ -1483,7 +1483,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit int32_t numOfColsOfRow1 = 0; if (pSchema1 == NULL) { - pSchema1 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, TD_ROW_SVER(row1)); + pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1)); } if (isRow1DataRow) { @@ -1496,7 +1496,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit if (row2) { isRow2DataRow = TD_IS_TP_ROW(row2); if (pSchema2 == NULL) { - pSchema2 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, TD_ROW_SVER(row2)); + pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2)); } if (isRow2DataRow) { numOfColsOfRow2 = schemaNCols(pSchema2); @@ -2514,7 +2514,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int win->ekey = key; if (rv != TD_ROW_SVER(row)) { - pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0); + pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0); rv = TD_ROW_SVER(row); } mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema, diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index b98fe8936a566ebfb8a633683806b8693cf9c043..4b1c213cdd3d7407ec4436468ac2c7b6db606115 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -260,7 +260,7 @@ static void poolFree(void *arg, void *ptr) { int32_t tsdbInitSma(STsdb *pTsdb) { // tSma - int32_t numOfTSma = taosArrayGetSize(metaGetSmaTbUids(pTsdb->pMeta, false)); + int32_t numOfTSma = taosArrayGetSize(metaGetSmaTbUids(REPO_META(pTsdb), false)); if (numOfTSma > 0) { atomic_store_16(&REPO_TSMA_NUM(pTsdb), (int16_t)numOfTSma); } @@ -348,7 +348,7 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did) } char aname[TSDB_FILENAME_LEN] = {0}; - tfsAbsoluteName(pTsdb->pTfs, did, path, aname); + tfsAbsoluteName(REPO_TFS(pTsdb), did, path, aname); if (tsdbOpenDBEnv(&pEnv->dbEnv, aname) != TSDB_CODE_SUCCESS) { tsdbFreeSmaEnv(pEnv); return NULL; @@ -519,14 +519,14 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { char rname[TSDB_FILENAME_LEN] = {0}; SDiskID did = {0}; - tfsAllocDisk(pTsdb->pTfs, TFS_PRIMARY_LEVEL, &did); + tfsAllocDisk(REPO_TFS(pTsdb), TFS_PRIMARY_LEVEL, &did); if (did.level < 0 || did.id < 0) { tsdbUnlockRepo(pTsdb); return TSDB_CODE_FAILED; } tsdbGetSmaDir(REPO_ID(pTsdb), smaType, rname); - if (tfsMkdirRecurAt(pTsdb->pTfs, rname, did) != TSDB_CODE_SUCCESS) { + if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rname, did) != TSDB_CODE_SUCCESS) { tsdbUnlockRepo(pTsdb); return TSDB_CODE_FAILED; } @@ -557,7 +557,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t } // cache smaMeta - STSma *pSma = metaGetSmaInfoByIndex(pTsdb->pMeta, indexUid, true); + STSma *pSma = metaGetSmaInfoByIndex(REPO_META(pTsdb), indexUid, true); if (pSma == NULL) { terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META; taosHashCleanup(pItem->expiredWindows); @@ -613,7 +613,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers return TSDB_CODE_SUCCESS; } - if (!pTsdb->pMeta) { + if (!REPO_META(pTsdb)) { terrno = TSDB_CODE_INVALID_PTR; return TSDB_CODE_FAILED; } @@ -1583,7 +1583,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) { // record current timezone of server side vCreateSmaReq.tSma.timezoneInt = tsTimezone; - if (metaCreateTSma(pTsdb->pMeta, &vCreateSmaReq) < 0) { + if (metaCreateTSma(REPO_META(pTsdb), &vCreateSmaReq) < 0) { // TODO: handle error tdDestroyTSma(&vCreateSmaReq.tSma); return -1; @@ -1610,7 +1610,7 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) { // } // - if (metaDropTSma(pTsdb->pMeta, vDropSmaReq.indexUid) < 0) { + if (metaDropTSma(REPO_META(pTsdb), vDropSmaReq.indexUid) < 0) { // TODO: handle error return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 5d4b07ec185d813ef0e984879f08240a11569856..9e4aa714e29ded8854c0d2602561e14162acbb0b 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -96,8 +96,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { // open tsdb sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TSDB_DIR); - pVnode->pTsdb = - tsdbOpen(tdir, TD_VID(pVnode), &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs); + pVnode->pTsdb = tsdbOpen(tdir, pVnode, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode)); if (pVnode->pTsdb == NULL) { vError("vgId: %d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err;