From 421d087a8f11d75ec00323290d5b3039eb434196 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sat, 12 Mar 2022 19:19:21 +0800 Subject: [PATCH] add tsdbBDBImpl files --- include/common/taosdef.h | 6 + source/dnode/vnode/inc/tsdb.h | 2 + source/dnode/vnode/src/inc/tsdbDBDef.h | 35 ++++ source/dnode/vnode/src/inc/tsdbDef.h | 16 +- source/dnode/vnode/src/inc/tsdbFile.h | 12 +- source/dnode/vnode/src/inc/tsdbSma.h | 48 +++++- source/dnode/vnode/src/tsdb/tsdbBDBImpl.c | 112 +++++++++++++ source/dnode/vnode/src/tsdb/tsdbMain.c | 29 +++- source/dnode/vnode/src/tsdb/tsdbSma.c | 196 +++++++++++++++++----- source/dnode/vnode/src/tsdb/tsdbWrite.c | 8 + source/dnode/vnode/test/tsdbSmaTest.cpp | 53 +++++- 11 files changed, 464 insertions(+), 53 deletions(-) create mode 100644 source/dnode/vnode/src/inc/tsdbDBDef.h diff --git a/include/common/taosdef.h b/include/common/taosdef.h index 9e5e5ebdcf..8dc634571f 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -61,6 +61,12 @@ typedef enum { TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired } ETsdbSmaStat; +typedef enum { + TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA + TSDB_SMA_TYPE_TIME_RANGE = 1, // Time-range-wise SMA + TSDB_SMA_TYPE_ROLLUP = 2, // Rollup SMA +} ETsdbSmaType; + extern char *qtypeStr[]; #define TSDB_PORT_HTTP 11 diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index e67e0cae4b..25bac86f71 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -96,6 +96,8 @@ int tsdbCommit(STsdb *pTsdb); */ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg); +int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg); + /** * @brief Insert RSma(Time-range-wise Rollup SMA) data. * diff --git a/source/dnode/vnode/src/inc/tsdbDBDef.h b/source/dnode/vnode/src/inc/tsdbDBDef.h new file mode 100644 index 0000000000..cc40cec7d1 --- /dev/null +++ b/source/dnode/vnode/src/inc/tsdbDBDef.h @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_DB_DEF_H_ +#define _TD_TSDB_DB_DEF_H_ + +#include "db.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SDBFile SDBFile; +typedef DB_ENV* TDBEnv; + +int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile* pDBF); +void tsdbCloseDBF(SDBFile* pDBF); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_TSDB_DB_DEF_H_*/ diff --git a/source/dnode/vnode/src/inc/tsdbDef.h b/source/dnode/vnode/src/inc/tsdbDef.h index 1451ac9685..0956e418bb 100644 --- a/source/dnode/vnode/src/inc/tsdbDef.h +++ b/source/dnode/vnode/src/inc/tsdbDef.h @@ -43,6 +43,8 @@ extern "C" { struct STsdb { int32_t vgId; + bool repoLocked; + pthread_mutex_t mutex; char * path; STsdbCfg config; STsdbMemTable * mem; @@ -52,12 +54,18 @@ struct STsdb { STsdbFS * fs; SMeta * pMeta; STfs * pTfs; - SSmaStat * pSmaStat; + SSmaEnv * pTSmaEnv; + SSmaEnv * pRSmaEnv; + // SSmaStat * pSmaStat; }; -#define REPO_ID(r) ((r)->vgId) -#define REPO_CFG(r) (&(r)->config) -#define REPO_FS(r) (r)->fs +#define REPO_ID(r) ((r)->vgId) +#define REPO_CFG(r) (&(r)->config) +#define REPO_FS(r) (r)->fs +#define IS_REPO_LOCKED(r) (r)->repoLocked + +int tsdbLockRepo(STsdb *pTsdb); +int tsdbUnlockRepo(STsdb *pTsdb); static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, bool copy, int32_t version) { return pTable->pSchema; diff --git a/source/dnode/vnode/src/inc/tsdbFile.h b/source/dnode/vnode/src/inc/tsdbFile.h index 5cc8cc045e..e65ef72623 100644 --- a/source/dnode/vnode/src/inc/tsdbFile.h +++ b/source/dnode/vnode/src/inc/tsdbFile.h @@ -329,21 +329,23 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) { // =============== SDFileSet typedef struct { int fid; - int8_t state; // -128~127 - uint8_t ver; // 0~255, DFileSet version + int8_t state; // -128~127 + uint8_t ver; // 0~255, DFileSet version uint16_t reserve; SDFile files[TSDB_FILE_MAX]; } SDFileSet; typedef struct { - int fid; - int8_t state; - uint8_t ver; + int fid; + int8_t state; + uint8_t ver; + uint16_t reserve; #if 0 SDFInfo info; #endif STfsFile f; TdFilePtr pFile; + } SSFile; // files split by days with fid #define TSDB_LATEST_FSET_VER 0 diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h index 7fceb580d6..87e20b8aa9 100644 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -17,27 +17,40 @@ #define _TD_TSDB_SMA_H_ typedef struct SSmaStat SSmaStat; +typedef struct SSmaEnv SSmaEnv; + + +struct SSmaEnv { + pthread_rwlock_t lock; + char * path; + SSmaStat * pStat; +}; + +#define SMA_ENV_LOCK(env) ((env)->lock) +#define SMA_ENV_PATH(env) ((env)->path) +#define SMA_ENV_STAT(env) ((env)->pStat) +#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems) // insert/update interface int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg); int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg); - // query interface // TODO: This is the basic params, and should wrap the params to a queryHandle. int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult); // management interface -int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg); -int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); +int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg); +void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv); +void * tsdbFreeSmaEnv(SSmaEnv *pSmaEnv); #if 0 int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result); int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin); #endif -// internal func +// internal func static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) { int32_t len = 0; len += taosEncodeFixedI64(pData, tableUid); @@ -46,4 +59,31 @@ static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId, return len; } +static FORCE_INLINE int tsdbRLockSma(SSmaEnv *pEnv) { + int code = pthread_rwlock_rdlock(&(pEnv->lock)); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + return 0; +} + +static FORCE_INLINE int tsdbWLockSma(SSmaEnv *pEnv) { + int code = pthread_rwlock_wrlock(&(pEnv->lock)); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + return 0; +} + +static FORCE_INLINE int tsdbUnLockSma(SSmaEnv *pEnv) { + int code = pthread_rwlock_unlock(&(pEnv->lock)); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + return 0; +} + #endif /* _TD_TSDB_SMA_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c b/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c index f2f48bbc8a..7ea9f134cf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c @@ -12,3 +12,115 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ + +#define ALLOW_FORBID_FUNC +#include "db.h" + +#include "tcoding.h" +#include "thash.h" +#include "tsdbDBDef.h" + +#define IMPL_WITH_LOCK 1 + +struct SDBFile { + DB * pDB; + char *path; +}; + +static int tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path); +static void tsdbCloseBDBEnv(DB_ENV *pEnv); +static int tsdbOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup); +static void tsdbCloseBDBDb(DB *pDB); + +#define BDB_PERR(info, code) fprintf(stderr, info " reason: %s", db_strerror(code)) + +int tsdbOpenDBF(TDBEnv pEnv, SDBFile *pDBF) { + // TDBEnv is shared by a group of SDBFile + ASSERT(pEnv != NULL); + + // Open DBF + if (tsdbOpenBDBDb(&(pDBF->pDB), pEnv, pDBF->path, false) < 0) { + tsdbCloseBDBDb(pDBF->pDB); + return -1; + } + + return 0; +} + +static void *tsdbFreeDBF(SDBFile *pDBF) { + if (pDBF) { + free(pDBF); + } + return NULL; +} + +void tsdbCloseDBF(SDBFile *pDBF) { + if (pDBF->pDB) { + tsdbCloseBDBDb(pDBF->pDB); + pDBF->pDB = tsdbFreeDBF(pDBF); + } +} + +static int tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) { + int ret = 0; + DB_ENV *pEnv = NULL; + + if (path == NULL) return 0; + + ret = db_env_create(&pEnv, 0); + if (ret != 0) { + BDB_PERR("Failed to create tsdb env", ret); + return -1; + } + + ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL, 0); + if (ret != 0) { + BDB_PERR("Failed to open tsdb env", ret); + return -1; + } + + *ppEnv = pEnv; + + return 0; +} + +static void tsdbCloseBDBEnv(DB_ENV *pEnv) { + if (pEnv) { + pEnv->close(pEnv, 0); + } +} + +static int tsdbOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup) { + int ret; + DB *pDB; + + ret = db_create(&(pDB), pEnv, 0); + if (ret != 0) { + BDB_PERR("Failed to create DBP", ret); + return -1; + } + + if (isDup) { + ret = pDB->set_flags(pDB, DB_DUPSORT); + if (ret != 0) { + BDB_PERR("Failed to set DB flags", ret); + return -1; + } + } + + ret = pDB->open(pDB, NULL, pFName, NULL, DB_BTREE, DB_CREATE, 0); + if (ret) { + BDB_PERR("Failed to open DBF", ret); + return -1; + } + + *ppDB = pDB; + + return 0; +} + +static void tsdbCloseBDBDb(DB *pDB) { + if (pDB) { + pDB->close(pDB, 0); + } +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbMain.c b/source/dnode/vnode/src/tsdb/tsdbMain.c index 0c82911226..afa8921c00 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMain.c +++ b/source/dnode/vnode/src/tsdb/tsdbMain.c @@ -80,6 +80,8 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, pTsdb->pmaf = pMAF; pTsdb->pMeta = pMeta; pTsdb->pTfs = pTfs; + pTsdb->pTSmaEnv = NULL; + pTsdb->pRSmaEnv = NULL; pTsdb->fs = tsdbNewFS(pTsdbCfg); @@ -88,8 +90,9 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, static void tsdbFree(STsdb *pTsdb) { if (pTsdb) { + tsdbFreeSmaEnv(pTsdb->pRSmaEnv); + tsdbFreeSmaEnv(pTsdb->pTSmaEnv); tsdbFreeFS(pTsdb->fs); - tsdbDestroySmaState(pTsdb->pSmaStat); tfree(pTsdb->path); free(pTsdb); } @@ -105,6 +108,30 @@ static void tsdbCloseImpl(STsdb *pTsdb) { tsdbCloseFS(pTsdb); // TODO } + +int tsdbLockRepo(STsdb *pTsdb) { + int code = pthread_mutex_lock(&pTsdb->mutex); + if (code != 0) { + tsdbError("vgId:%d failed to lock tsdb since %s", REPO_ID(pTsdb), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + pTsdb->repoLocked = true; + return 0; +} + +int tsdbUnlockRepo(STsdb *pTsdb) { + ASSERT(IS_REPO_LOCKED(pTsdb)); + pTsdb->repoLocked = false; + int code = pthread_mutex_unlock(&pTsdb->mutex); + if (code != 0) { + tsdbError("vgId:%d failed to unlock tsdb since %s", REPO_ID(pTsdb), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + return 0; +} + #if 0 /* * Copyright (c) 2019 TAOS Data, Inc. diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index dc0d262725..ba8cde2121 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -16,6 +16,7 @@ #include "tsdbDef.h" #define SMA_STORAGE_TSDB_DAYS 30 +#define SMA_STORAGE_TSDB_TIMES 30 #define SMA_STORAGE_SPLIT_HOURS 24 #define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8 @@ -68,19 +69,104 @@ struct SSmaStat { }; // declaration of static functions -static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); -static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); -static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit); -static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData); -static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen); +static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); +static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); +static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path); +static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv); +static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); +static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit); +static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData); +static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen); static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision); +static int32_t tsdbGetTSmaDays(STSmaWriteH *pSmaH, int32_t storageLevel); static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid); - static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin); static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin); +static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) { + SSmaEnv *pEnv = NULL; + + pEnv = (SSmaEnv *)calloc(1, sizeof(SSmaEnv)); + if (pEnv == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + int code = pthread_rwlock_init(&(pEnv->lock), NULL); + if (code) { + terrno = TAOS_SYSTEM_ERROR(code); + free(pEnv); + return NULL; + } + + ASSERT(path && (strlen(path) > 0)); + pEnv->path = strdup(path); + if (pEnv->path == NULL) { + tsdbFreeSmaEnv(pEnv); + return NULL; + } + + if (tsdbInitSmaStat(&pEnv->pStat) != TSDB_CODE_SUCCESS) { + tsdbFreeSmaEnv(pEnv); + return NULL; + } + + return pEnv; +} + +static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) { + if (!pEnv) { + terrno = TSDB_CODE_INVALID_PTR; + return TSDB_CODE_FAILED; + } + + if (pEnv && *pEnv) { + return TSDB_CODE_SUCCESS; + } + + if (tsdbLockRepo(pTsdb) != 0) { + return TSDB_CODE_FAILED; + } + + if (*pEnv == NULL) { + if ((*pEnv = tsdbNewSmaEnv(pTsdb, path)) == NULL) { + tsdbUnlockRepo(pTsdb); + return TSDB_CODE_FAILED; + } + } + + if (tsdbUnlockRepo(pTsdb) != 0) { + tsdbFreeSmaEnv(*pEnv); + return TSDB_CODE_FAILED; + } + + return TSDB_CODE_SUCCESS; +} + +/** + * @brief Release resources allocated for its member fields, not including itself. + * + * @param pSmaEnv + * @return int32_t + */ +void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv) { + if (pSmaEnv) { + tsdbDestroySmaState(pSmaEnv->pStat); + tfree(pSmaEnv->pStat); + tfree(pSmaEnv->path); + pthread_rwlock_destroy(&(pSmaEnv->lock)); + } +} + +void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv) { + tsdbDestroySmaEnv(pSmaEnv); + tfree(pSmaEnv); + return NULL; +} + static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) { ASSERT(pSmaStat != NULL); @@ -125,6 +211,12 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) { return pItem; } +/** + * @brief Release resources allocated for its member fields, not including itself. + * + * @param pSmaStat + * @return int32_t + */ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) { if (pSmaStat) { // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. @@ -135,30 +227,42 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) { item = taosHashIterate(pSmaStat->smaStatItems, item); } taosHashCleanup(pSmaStat->smaStatItems); - free(pSmaStat); } } /** * @brief Update expired window according to msg from stream computing module. - * - * @param pTsdb - * @param msg - * @return int32_t + * + * @param pTsdb + * @param smaType ETsdbSmaType + * @param msg + * @return int32_t */ -int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { - if (msg == NULL) { +int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { + STsdbCfg *pCfg = REPO_CFG(pTsdb); + SSmaEnv * pEnv = NULL; + + if (!msg || !pTsdb->pMeta) { + terrno = TSDB_CODE_INVALID_PTR; return TSDB_CODE_FAILED; } - // lazy mode - if (tsdbInitSmaStat(&pTsdb->pSmaStat) != TSDB_CODE_SUCCESS) { + if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { + pEnv = pTsdb->pTSmaEnv; + } else if (smaType == TSDB_SMA_TYPE_ROLLUP) { + pEnv = pTsdb->pRSmaEnv; + } else { + ASSERT(0); + } + + char smaPath[TSDB_FILENAME_LEN] = "/proj/.sma/"; + if (tsdbInitSmaEnv(pTsdb, smaPath, &pEnv) != TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; } // TODO: decode the msg => start int64_t indexUid = SMA_TEST_INDEX_UID; - const char * indexName = SMA_TEST_INDEX_NAME; + // const char * indexName = SMA_TEST_INDEX_NAME; const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10; TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE]; int64_t now = taosGetTimestampMs(); @@ -167,9 +271,9 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { } // TODO: decode the msg <= end - SHashObj *pItemsHash = pTsdb->pSmaStat->smaStatItems; + SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv); - SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, indexName, strlen(indexName)); + SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); if (pItem == NULL) { pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state if (pItem == NULL) { @@ -188,7 +292,7 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { pItem->pSma = pSma; // TODO: change indexName to indexUid - if (taosHashPut(pItemsHash, indexName, strnlen(indexName, TSDB_INDEX_NAME_LEN), &pItem, sizeof(pItem)) != 0) { + if (taosHashPut(pItemsHash, &indexUid, sizeof(indexUid), &pItem, sizeof(pItem)) != 0) { // If error occurs during put smaStatItem, free the resources of pItem taosHashCleanup(pItem->expiredWindows); free(pItem); @@ -207,7 +311,7 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { // windows failed to put into hash table. taosHashCleanup(pItem->expiredWindows); tfree(pItem->pSma); - taosHashRemove(pItemsHash, indexName, sizeof(indexName)); + taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid)); return TSDB_CODE_FAILED; } } @@ -215,11 +319,12 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { return TSDB_CODE_SUCCESS; } -static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, int64_t indexUid, TSKEY skey) { +static int32_t tsdbResetExpiredWindow(SSmaStat *pStat, int64_t indexUid, TSKEY skey) { SSmaStatItem *pItem = NULL; - if (pTsdb->pSmaStat && pTsdb->pSmaStat->smaStatItems) { - pItem = (SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, &indexUid, sizeof(indexUid)); + // TODO: If HASH_ENTRY_LOCK used, whether rwlock needed to handle cases of removing hashNode? + if (pStat && pStat->smaStatItems) { + pItem = (SSmaStatItem *)taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); } if (pItem != NULL) { @@ -241,7 +346,7 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, int64_t indexUid, TSKEY skey * @param intervalUnit * @return int32_t */ -static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit) { +static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) { // TODO: configurable for SMA_STORAGE_SPLIT_HOURS? switch (intervalUnit) { case TD_TIME_UNIT_HOUR: @@ -422,12 +527,24 @@ static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWra } static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) { - // TODO + STsdb *pTsdb = pSmaH->pTsdb; + pSmaH->pDFile = "tSma_interval_file_name"; return TSDB_CODE_SUCCESS; -} +} +static int32_t tsdbGetTSmaDays(STSmaWriteH *pSmaH, int32_t storageLevel) { + STsdbCfg *pCfg = REPO_CFG(pSmaH->pTsdb); + int32_t daysPerFile = pCfg->daysPerFile; + + if (storageLevel == SMA_STORAGE_LEVEL_TSDB) { + int32_t days = 30 * (pSmaH->interval / tsTickPerDay[pCfg->precision]); + daysPerFile = days > SMA_STORAGE_TSDB_DAYS ? days : SMA_STORAGE_TSDB_DAYS; + } + + return daysPerFile; +} /** * @brief Insert/Update Time-range-wise SMA data. @@ -454,23 +571,26 @@ int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { return terrno; } - // Step 1: Judge the storage level - int32_t storageLevel = tsdbJudgeStorageLevel(pData->interval, pData->intervalUnit); - int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile; - - // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file - // - Set and open the DFile or the B+Tree file + if (!pTsdb->pTSmaEnv) { + terrno = TSDB_CODE_INVALID_PTR; + return terrno; + } + // Step 1: Judge the storage level and days + int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit); + int32_t daysPerFile = tsdbGetTSmaDays(&tSmaH, storageLevel); int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision)); - // Save all the TSma data to one file + // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file + // - Set and open the DFile or the B+Tree file // TODO: tsdbStartTSmaCommit(); tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid); + tsdbInsertTSmaDataSection(&tSmaH, pData); // TODO:tsdbEndTSmaCommit(); // reset the SSmaStat - tsdbResetExpiredWindow(pTsdb, pData->indexUid, pData->skey); + tsdbResetExpiredWindow(SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey); return TSDB_CODE_SUCCESS; } @@ -496,7 +616,7 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { } // Step 1: Judge the storage level - int32_t storageLevel = tsdbJudgeStorageLevel(pData->interval, pData->intervalUnit); + int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit); int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile; // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file @@ -511,7 +631,7 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { // TODO:tsdbEndTSmaCommit(); // reset the SSmaStat - tsdbResetExpiredWindow(pTsdb, pData->indexUid, pData->skey); + tsdbResetExpiredWindow(SMA_ENV_STAT(pTsdb->pRSmaEnv), pData->indexUid, pData->skey); return TSDB_CODE_SUCCESS; } @@ -540,7 +660,7 @@ static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapp * @return int32_t */ static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) { - int32_t storageLevel = 0; //tsdbJudgeStorageLevel(param->interval, param->intervalUnit); + int32_t storageLevel = 0; // tsdbGetSmaStorageLevel(param->interval, param->intervalUnit); int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : REPO_CFG(pReadH->pTsdb)->daysPerFile; pReadH->storageLevel = storageLevel; @@ -594,7 +714,7 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) { */ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult) { SSmaStatItem *pItem = - (SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, &pData->indexUid, sizeof(pData->indexUid)); + (SSmaStatItem *)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &pData->indexUid, sizeof(pData->indexUid)); if (pItem == NULL) { // mark all window as expired and notify query module to query raw TS data. return TSDB_CODE_SUCCESS; diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 26d31af4f3..dede1502f4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -51,6 +51,14 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) { return code; } +int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tsdbUpdateExpiredWindow(pTsdb, smaType, msg)) < 0) { + tsdbWarn("vgId:%d update expired window failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return code; +} + /** * @brief Insert Time-range-wise Rollup Sma(RSma) data * diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index ac9a8fd3d0..159ad98219 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -221,12 +221,60 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { #if 1 TEST(testCase, tSmaInsertTest) { + // prepare meta + const char * smaIndexName1 = "sma_index_test_1"; + const char * smaIndexName2 = "sma_index_test_2"; + const char * timezone = "Asia/Shanghai"; + const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;"; + const char * tagsFilter = "I'm tags filter"; + const char * smaTestDir = "./smaTest"; + const tb_uid_t tbUid = 1234567890; + const int64_t indexUid1 = 2000000001; + const int64_t indexUid2 = 2000000002; + const uint32_t nCntTSma = 2; + // encode + STSma tSma = {0}; + tSma.version = 0; + tSma.intervalUnit = TD_TIME_UNIT_DAY; + tSma.interval = 1; + tSma.slidingUnit = TD_TIME_UNIT_HOUR; + tSma.sliding = 0; + tSma.indexUid = indexUid1; + tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN); + tstrncpy(tSma.timezone, timezone, TD_TIMEZONE_LEN); + tSma.tableUid = tbUid; + + tSma.exprLen = strlen(expr); + tSma.expr = (char *)calloc(tSma.exprLen + 1, 1); + tstrncpy(tSma.expr, expr, tSma.exprLen + 1); + + tSma.tagsFilterLen = strlen(tagsFilter); + tSma.tagsFilter = (char *)calloc(tSma.tagsFilterLen + 1, 1); + tstrncpy(tSma.tagsFilter, tagsFilter, tSma.tagsFilterLen + 1); + + SMeta * pMeta = NULL; + STSma * pSmaCfg = &tSma; + const SMetaCfg *pMetaCfg = &defaultMetaOptions; + + taosRemoveDir(smaTestDir); + + pMeta = metaOpen(smaTestDir, pMetaCfg, NULL); + assert(pMeta != NULL); + // save index 1 + EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0); + + + // insert data const int64_t indexUid = 2000000002; STSmaDataWrapper *pSmaData = NULL; STsdb tsdb = {0}; STsdbCfg * pCfg = &tsdb.config; pCfg->daysPerFile = 1; + tsdb.pMeta = pMeta; + + char *msg = (char *)calloc(100, 1); + EXPECT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0); // init int32_t allocCnt = 0; @@ -277,8 +325,11 @@ TEST(testCase, tSmaInsertTest) { // execute EXPECT_EQ(tsdbInsertTSmaData(&tsdb, (char *)pSmaData), TSDB_CODE_SUCCESS); - // release + // release data taosTZfree(buf); + // release meta + tdDestroyTSma(&tSma); + metaClose(pMeta); } #endif -- GitLab