From 8a709fdbf3ddd848756b8459b3283dfd5fa82fb2 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Fri, 18 Mar 2022 20:11:33 +0800 Subject: [PATCH] refactor and put sma file in smaUid dir --- source/dnode/vnode/src/inc/tsdbDef.h | 1 + source/dnode/vnode/src/inc/tsdbFS.h | 24 ++++---- source/dnode/vnode/src/inc/tsdbSma.h | 6 +- source/dnode/vnode/src/tsdb/tsdbSma.c | 80 +++++++++++++++---------- source/dnode/vnode/test/tsdbSmaTest.cpp | 2 + 5 files changed, 69 insertions(+), 44 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdbDef.h b/source/dnode/vnode/src/inc/tsdbDef.h index 5e4c852621..69bd1ccea5 100644 --- a/source/dnode/vnode/src/inc/tsdbDef.h +++ b/source/dnode/vnode/src/inc/tsdbDef.h @@ -63,6 +63,7 @@ struct STsdb { #define REPO_ID(r) ((r)->vgId) #define REPO_CFG(r) (&(r)->config) #define REPO_FS(r) (r)->fs +#define REPO_TFS(r) (r)->pTfs #define IS_REPO_LOCKED(r) (r)->repoLocked #define REPO_SMA_ENV(r, t) ((TSDB_SMA_TYPE_ROLLUP == (t)) ? (r)->pRSmaEnv : (r)->pTSmaEnv) diff --git a/source/dnode/vnode/src/inc/tsdbFS.h b/source/dnode/vnode/src/inc/tsdbFS.h index 96c5f8468f..21b445755f 100644 --- a/source/dnode/vnode/src/inc/tsdbFS.h +++ b/source/dnode/vnode/src/inc/tsdbFS.h @@ -42,23 +42,23 @@ typedef struct { typedef struct { STsdbFSMeta meta; // FS meta SArray * df; // data file array - SArray * sf; // sma data file array v2(t|r)1900.index_name_1 + SArray * sf; // sma data file array v2f1900.index_name_1 } SFSStatus; /** * @brief Directory structure of .tsma data files. * - * /vnode2/tsdb $ tree .sma/ - * .sma/ - * ├── v2t100.index_name_1 - * ├── v2t101.index_name_1 - * ├── v2t102.index_name_1 - * ├── v2t1900.index_name_3 - * ├── v2t1901.index_name_3 - * ├── v2t1902.index_name_3 - * ├── v2t200.index_name_2 - * ├── v2t201.index_name_2 - * └── v2t202.index_name_2 + * /vnode2/tsdb $ tree tsma/ + * tsma/ + * ├── v2f100.index_name_1 + * ├── v2f101.index_name_1 + * ├── v2f102.index_name_1 + * ├── v2f1900.index_name_3 + * ├── v2f1901.index_name_3 + * ├── v2f1902.index_name_3 + * ├── v2f200.index_name_2 + * ├── v2f201.index_name_2 + * └── v2f202.index_name_2 * * 0 directories, 9 files */ diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h index 649b5a2d47..b94f0b1b83 100644 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -21,12 +21,14 @@ typedef struct SSmaEnv SSmaEnv; struct SSmaEnv { pthread_rwlock_t lock; - TDBEnv dbEnv; - char * path; + SDiskID did; + TDBEnv dbEnv; // TODO: If it's better to put it in smaIndex level? + char * path; // relative path SSmaStat * pStat; }; #define SMA_ENV_LOCK(env) ((env)->lock) +#define SMA_ENV_DID(env) ((env)->did) #define SMA_ENV_ENV(env) ((env)->dbEnv) #define SMA_ENV_PATH(env) ((env)->path) #define SMA_ENV_STAT(env) ((env)->pStat) diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 02a0b587d5..512ea9fa25 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -67,7 +67,7 @@ typedef struct { */ int8_t state; // ETsdbSmaStat SHashObj *expiredWindows; // key: skey of time window, value: N/A - STSma * pSma; + STSma * pSma; // cache schema } SSmaStatItem; struct SSmaStat { @@ -81,8 +81,8 @@ struct SSmaStat { static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg); static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); -static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path); -static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv); +static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did); +static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv); static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey); static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat); static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat); @@ -102,8 +102,8 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen); static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision); static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel); -static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid); -static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey); +static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int64_t indexUid, int32_t fid); +static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey); static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey); static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]); static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg); @@ -111,10 +111,11 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg); // implementation static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) { - snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s", vgId, TSDB_SMA_DNAME[smaType]); + snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%stsdb%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TD_DIRSEP, + TSDB_SMA_DNAME[smaType]); } -static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) { +static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did) { SSmaEnv *pEnv = NULL; pEnv = (SSmaEnv *)calloc(1, sizeof(SSmaEnv)); @@ -137,12 +138,16 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) { return NULL; } + pEnv->did = did; + if (tsdbInitSmaStat(&pEnv->pStat) != TSDB_CODE_SUCCESS) { tsdbFreeSmaEnv(pEnv); return NULL; } - if (tsdbOpenBDBEnv(&pEnv->dbEnv, pEnv->path) != TSDB_CODE_SUCCESS) { + char aname[TSDB_FILENAME_LEN] = {0}; + tfsAbsoluteName(pTsdb->pTfs, did, path, aname); + if (tsdbOpenBDBEnv(&pEnv->dbEnv, aname) != TSDB_CODE_SUCCESS) { tsdbFreeSmaEnv(pEnv); return NULL; } @@ -150,14 +155,14 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) { return pEnv; } -static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) { +static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv) { if (!pEnv) { terrno = TSDB_CODE_INVALID_PTR; return TSDB_CODE_FAILED; } if (*pEnv == NULL) { - if ((*pEnv = tsdbNewSmaEnv(pTsdb, path)) == NULL) { + if ((*pEnv = tsdbNewSmaEnv(pTsdb, path, did)) == NULL) { return TSDB_CODE_FAILED; } } @@ -296,7 +301,6 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&pTsdb->pTSmaEnv) : atomic_load_ptr(&pTsdb->pRSmaEnv); if (pEnv == NULL) { char rname[TSDB_FILENAME_LEN] = {0}; - char aname[TSDB_FILENAME_LEN] = {0}; // use TSDB_FILENAME_LEN currently SDiskID did = {0}; tfsAllocDisk(pTsdb->pTfs, TFS_PRIMARY_LEVEL, &did); @@ -305,14 +309,13 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { return TSDB_CODE_FAILED; } tsdbGetSmaDir(REPO_ID(pTsdb), smaType, rname); - tfsAbsoluteName(pTsdb->pTfs, did, rname, aname); if (tfsMkdirRecurAt(pTsdb->pTfs, rname, did) != TSDB_CODE_SUCCESS) { tsdbUnlockRepo(pTsdb); return TSDB_CODE_FAILED; } - if (tsdbInitSmaEnv(pTsdb, aname, &pEnv) != TSDB_CODE_SUCCESS) { + if (tsdbInitSmaEnv(pTsdb, rname, did, &pEnv) != TSDB_CODE_SUCCESS) { tsdbUnlockRepo(pTsdb); return TSDB_CODE_FAILED; } @@ -339,11 +342,6 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) { return TSDB_CODE_FAILED; } - if (tsdbCheckAndInitSmaEnv(pTsdb, smaType) != TSDB_CODE_SUCCESS) { - terrno = TSDB_CODE_TDB_INIT_FAILED; - return TSDB_CODE_FAILED; - } - // TODO: decode the msg from Stream Computing module => start int64_t indexUid = SMA_TEST_INDEX_UID; const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10; @@ -354,6 +352,11 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg) { } // TODO: decode the msg <= end + if (tsdbCheckAndInitSmaEnv(pTsdb, smaType) != TSDB_CODE_SUCCESS) { + terrno = TSDB_CODE_TDB_INIT_FAILED; + return TSDB_CODE_FAILED; + } + SSmaEnv * pEnv = REPO_SMA_ENV(pTsdb, smaType); SSmaStat *pStat = SMA_ENV_STAT(pEnv); SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv); @@ -660,14 +663,13 @@ static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) { } } -static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) { +static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int64_t indexUid, int32_t fid) { STsdb *pTsdb = pSmaH->pTsdb; ASSERT(pSmaH->dFile.path == NULL && pSmaH->dFile.pDB == NULL); pSmaH->dFile.fid = fid; - char tSmaFile[TSDB_FILENAME_LEN] = {0}; - snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pTsdb), fid); + snprintf(tSmaFile, TSDB_FILENAME_LEN, "%" PRIi64 "%sv%df%d.tsma", indexUid, TD_DIRSEP, REPO_ID(pTsdb), fid); pSmaH->dFile.path = strdup(tSmaFile); return TSDB_CODE_SUCCESS; @@ -708,8 +710,9 @@ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLe static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { STsdbCfg * pCfg = REPO_CFG(pTsdb); STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; + SSmaEnv * pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv); - if (!atomic_load_ptr(&pTsdb->pTSmaEnv)) { + if (pEnv == NULL) { terrno = TSDB_CODE_INVALID_PTR; tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb)); return terrno; @@ -727,6 +730,17 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { return TSDB_CODE_FAILED; } + int64_t indexUid = SMA_TEST_INDEX_UID; + char rPath[TSDB_FILENAME_LEN] = {0}; + char aPath[TSDB_FILENAME_LEN] = {0}; + snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid); + tfsAbsoluteName(REPO_TFS(pTsdb), SMA_ENV_DID(pEnv), rPath, aPath); + if (!taosCheckExistFile(aPath)) { + if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rPath, SMA_ENV_DID(pEnv)) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_FAILED; + } + } + // Step 1: Judge the storage level and days int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit); int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel); @@ -735,7 +749,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file // - Set and open the DFile or the B+Tree file // TODO: tsdbStartTSmaCommit(); - tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid); + tsdbSetTSmaDataFile(&tSmaH, pData, indexUid, fid); if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) { tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb), tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno)); @@ -822,13 +836,16 @@ static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interv * @brief Init of tSma FS * * @param pReadH + * @param indexUid * @param skey * @return int32_t */ -static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey) { - int32_t fid = (int32_t)(TSDB_KEY_FID(skey, pSmaH->days, REPO_CFG(pSmaH->pTsdb)->precision)); +static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, int64_t indexUid, TSKEY skey) { + STsdb *pTsdb = pSmaH->pTsdb; + + int32_t fid = (int32_t)(TSDB_KEY_FID(skey, pSmaH->days, REPO_CFG(pTsdb)->precision)); char tSmaFile[TSDB_FILENAME_LEN] = {0}; - snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pSmaH->pTsdb), fid); + snprintf(tSmaFile, TSDB_FILENAME_LEN, "%" PRIi64 "%sv%df%d.tsma", indexUid, TD_DIRSEP, REPO_ID(pTsdb), fid); pSmaH->dFile.path = strdup(tSmaFile); pSmaH->smaFsIter.iter = 0; pSmaH->smaFsIter.fid = fid; @@ -887,14 +904,16 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) { static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult) { - if (!atomic_load_ptr(&pTsdb->pTSmaEnv)) { + SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv); + + if (!pEnv) { terrno = TSDB_CODE_INVALID_PTR; tsdbWarn("vgId:%d getTSmaDataImpl failed since pTSmaEnv is NULL", REPO_ID(pTsdb)); return TSDB_CODE_FAILED; } - tsdbRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv)); - SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid)); + tsdbRefSmaStat(pTsdb, SMA_ENV_STAT(pEnv)); + SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid)); if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL)) { // Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if // it's NULL. @@ -926,11 +945,12 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_ tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv)); #endif + STSmaReadH tReadH = {0}; tsdbInitTSmaReadH(&tReadH, pTsdb, interval, intervalUnit); tsdbCloseDBF(&tReadH.dFile); - tsdbInitTSmaFile(&tReadH, querySKey); + tsdbInitTSmaFile(&tReadH, indexUid, querySKey); if (tsdbOpenDBF(SMA_ENV_ENV(pTsdb->pTSmaEnv), &tReadH.dFile) != 0) { tsdbWarn("vgId:%d open DBF %s failed since %s", REPO_ID(pTsdb), tReadH.dFile.path, tstrerror(terrno)); return TSDB_CODE_FAILED; diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 5a87c180b6..bdeb51c018 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -33,6 +33,7 @@ int main(int argc, char **argv) { return RUN_ALL_TESTS(); } +#if 1 TEST(testCase, tSma_Meta_Encode_Decode_Test) { // encode STSma tSma = {0}; @@ -88,6 +89,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) { tdDestroyTSma(&tSma); tdDestroyTSmaWrapper(&dstTSmaWrapper); } +#endif #if 1 TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { -- GitLab