diff --git a/include/common/taosdef.h b/include/common/taosdef.h index 9e5e5ebdcf3fa0c9cd975ccee09fd39a5b543dac..8dc634571fae9865041adca41daa46c467661ebc 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -61,6 +61,12 @@ typedef enum { TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired } ETsdbSmaStat; +typedef enum { + TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA + TSDB_SMA_TYPE_TIME_RANGE = 1, // Time-range-wise SMA + TSDB_SMA_TYPE_ROLLUP = 2, // Rollup SMA +} ETsdbSmaType; + extern char *qtypeStr[]; #define TSDB_PORT_HTTP 11 diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 3a1343b3848e4a9496ccd7b5b98968bd933f29c3..1c61d738b5d70f013f292ecaf03c70e4ceddb13b 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -353,6 +353,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TDB_MESSED_MSG TAOS_DEF_ERROR_CODE(0, 0x0614) #define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) #define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616) +#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0617) // query #define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index e67e0cae4bbd3fbbd35730537c5dc91397e1d18f..87edfb8dde59deabdb28b8d6b03ae7bf1a67cb6c 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -95,6 +95,7 @@ int tsdbCommit(STsdb *pTsdb); * @return int32_t */ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg); +int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg); /** * @brief Insert RSma(Time-range-wise Rollup SMA) data. @@ -105,6 +106,12 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg); */ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); +// TODO: This is the basic params, and should wrap the params to a queryHandle. +int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, + int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, + int32_t nMaxResult); + + // STsdbCfg int tsdbOptionsInit(STsdbCfg *); void tsdbOptionsClear(STsdbCfg *); diff --git a/source/dnode/vnode/src/inc/tsdbDBDef.h b/source/dnode/vnode/src/inc/tsdbDBDef.h new file mode 100644 index 0000000000000000000000000000000000000000..2e37b0ba45132a92e4de5b7e0a21e680564256bc --- /dev/null +++ b/source/dnode/vnode/src/inc/tsdbDBDef.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_DB_DEF_H_ +#define _TD_TSDB_DB_DEF_H_ + +#include "db.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SDBFile SDBFile; +typedef DB_ENV* TDBEnv; + +struct SDBFile { + DB* pDB; + char* path; +}; + +int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile* pDBF); +void tsdbCloseDBF(SDBFile* pDBF); +int32_t tsdbOpenBDBEnv(DB_ENV** ppEnv, const char* path); +void tsdbCloseBDBEnv(DB_ENV* pEnv); +int32_t tsdbSaveSmaToDB(SDBFile* pDBF, void* key, uint32_t keySize, void* data, uint32_t dataSize); +void* tsdbGetSmaDataByKey(SDBFile* pDBF, void* key, uint32_t keySize, uint32_t* valueSize); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_TSDB_DB_DEF_H_*/ diff --git a/source/dnode/vnode/src/inc/tsdbDef.h b/source/dnode/vnode/src/inc/tsdbDef.h index 1451ac9685ef108cf100358e3be07150aab5db05..6f91b4d3abb4bf608c50ca15973f83b19fb1dbf6 100644 --- a/source/dnode/vnode/src/inc/tsdbDef.h +++ b/source/dnode/vnode/src/inc/tsdbDef.h @@ -27,6 +27,7 @@ #include "ttime.h" #include "tsdb.h" +#include "tsdbDBDef.h" #include "tsdbCommit.h" #include "tsdbFS.h" #include "tsdbFile.h" @@ -37,12 +38,15 @@ #include "tsdbReadImpl.h" #include "tsdbSma.h" + #ifdef __cplusplus extern "C" { #endif struct STsdb { int32_t vgId; + bool repoLocked; + pthread_mutex_t mutex; char * path; STsdbCfg config; STsdbMemTable * mem; @@ -52,12 +56,17 @@ struct STsdb { STsdbFS * fs; SMeta * pMeta; STfs * pTfs; - SSmaStat * pSmaStat; + SSmaEnv * pTSmaEnv; + SSmaEnv * pRSmaEnv; }; -#define REPO_ID(r) ((r)->vgId) -#define REPO_CFG(r) (&(r)->config) -#define REPO_FS(r) (r)->fs +#define REPO_ID(r) ((r)->vgId) +#define REPO_CFG(r) (&(r)->config) +#define REPO_FS(r) (r)->fs +#define IS_REPO_LOCKED(r) (r)->repoLocked + +int tsdbLockRepo(STsdb *pTsdb); +int tsdbUnlockRepo(STsdb *pTsdb); static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, bool copy, int32_t version) { return pTable->pSchema; diff --git a/source/dnode/vnode/src/inc/tsdbFile.h b/source/dnode/vnode/src/inc/tsdbFile.h index 5cc8cc045e54699d780ea12d75dc0f5945d8ff5b..e65ef72623f9b12803765c80d63de07f13bbda07 100644 --- a/source/dnode/vnode/src/inc/tsdbFile.h +++ b/source/dnode/vnode/src/inc/tsdbFile.h @@ -329,21 +329,23 @@ static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) { // =============== SDFileSet typedef struct { int fid; - int8_t state; // -128~127 - uint8_t ver; // 0~255, DFileSet version + int8_t state; // -128~127 + uint8_t ver; // 0~255, DFileSet version uint16_t reserve; SDFile files[TSDB_FILE_MAX]; } SDFileSet; typedef struct { - int fid; - int8_t state; - uint8_t ver; + int fid; + int8_t state; + uint8_t ver; + uint16_t reserve; #if 0 SDFInfo info; #endif STfsFile f; TdFilePtr pFile; + } SSFile; // files split by days with fid #define TSDB_LATEST_FSET_VER 0 diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h index 7fceb580d61da473d3266a2c52424583a5d92f6c..649b5a2d47a102ee7fdf34b649609ffc7a08d4c2 100644 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -17,27 +17,29 @@ #define _TD_TSDB_SMA_H_ typedef struct SSmaStat SSmaStat; +typedef struct SSmaEnv SSmaEnv; -// insert/update interface -int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg); -int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg); +struct SSmaEnv { + pthread_rwlock_t lock; + TDBEnv dbEnv; + char * path; + SSmaStat * pStat; +}; +#define SMA_ENV_LOCK(env) ((env)->lock) +#define SMA_ENV_ENV(env) ((env)->dbEnv) +#define SMA_ENV_PATH(env) ((env)->path) +#define SMA_ENV_STAT(env) ((env)->pStat) +#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems) -// query interface -// TODO: This is the basic params, and should wrap the params to a queryHandle. -int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult); - -// management interface -int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg); -int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); +void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv); +void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv); #if 0 int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result); int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin); #endif // internal func - - static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) { int32_t len = 0; len += taosEncodeFixedI64(pData, tableUid); @@ -46,4 +48,31 @@ static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId, return len; } +static FORCE_INLINE int tsdbRLockSma(SSmaEnv *pEnv) { + int code = pthread_rwlock_rdlock(&(pEnv->lock)); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + return 0; +} + +static FORCE_INLINE int tsdbWLockSma(SSmaEnv *pEnv) { + int code = pthread_rwlock_wrlock(&(pEnv->lock)); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + return 0; +} + +static FORCE_INLINE int tsdbUnLockSma(SSmaEnv *pEnv) { + int code = pthread_rwlock_unlock(&(pEnv->lock)); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + return 0; +} + #endif /* _TD_TSDB_SMA_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index d9af526c2a2c073e7188467d1f50bc10ddaadd20..a729288e34dff33dde6e0e9f8764bdd7a4a937b3 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -231,30 +231,31 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) { void *pBuf = NULL, *qBuf = NULL; DBT key1 = {0}, value1 = {0}; - { - // save sma info - int32_t len = tEncodeTSma(NULL, pSmaCfg); - pBuf = calloc(len, 1); - if (pBuf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } + // save sma info + int32_t len = tEncodeTSma(NULL, pSmaCfg); + pBuf = calloc(len, 1); + if (pBuf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } - key1.data = (void *)&pSmaCfg->indexUid; - key1.size = sizeof(pSmaCfg->indexUid); + key1.data = (void *)&pSmaCfg->indexUid; + key1.size = sizeof(pSmaCfg->indexUid); - qBuf = pBuf; - tEncodeTSma(&qBuf, pSmaCfg); + qBuf = pBuf; + tEncodeTSma(&qBuf, pSmaCfg); - value1.data = pBuf; - value1.size = POINTER_DISTANCE(qBuf, pBuf); - value1.app_data = pSmaCfg; - } + value1.data = pBuf; + value1.size = POINTER_DISTANCE(qBuf, pBuf); + value1.app_data = pSmaCfg; metaDBWLock(pMeta->pDB); pMeta->pDB->pSmaDB->put(pMeta->pDB->pSmaDB, NULL, &key1, &value1, 0); metaDBULock(pMeta->pDB); + // release + tfree(pBuf); + return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c b/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c index f2f48bbc8a69a022d0fc6b8a88c5a9a55d0b4ad6..cf3351c5d892c9c884a3a1050d9e9e92c3977251 100644 --- a/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c @@ -12,3 +12,162 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ + +#define ALLOW_FORBID_FUNC +#include "db.h" + +#include "taoserror.h" +#include "tcoding.h" +#include "thash.h" +#include "tsdbDBDef.h" +#include "tsdbLog.h" + +#define IMPL_WITH_LOCK 1 + +static int tsdbOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup); +static void tsdbCloseBDBDb(DB *pDB); + +#define BDB_PERR(info, code) fprintf(stderr, "%s:%d " info " reason: %s\n", __FILE__, __LINE__, db_strerror(code)) + +int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile *pDBF) { + // TDBEnv is shared by a group of SDBFile + if (!pEnv) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + // Open DBF + if (tsdbOpenBDBDb(&(pDBF->pDB), pEnv, pDBF->path, false) < 0) { + terrno = TSDB_CODE_TDB_INIT_FAILED; + tsdbCloseBDBDb(pDBF->pDB); + return -1; + } + + return 0; +} + +void tsdbCloseDBF(SDBFile *pDBF) { + if (pDBF->pDB) { + tsdbCloseBDBDb(pDBF->pDB); + pDBF->pDB = NULL; + } + tfree(pDBF->path); +} + +int32_t tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) { + int ret = 0; + DB_ENV *pEnv = NULL; + + if (path == NULL) return 0; + + ret = db_env_create(&pEnv, 0); + if (ret != 0) { + BDB_PERR("Failed to create tsdb env", ret); + return -1; + } + + ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL, 0); + if (ret != 0) { + // BDB_PERR("Failed to open tsdb env", ret); + tsdbWarn("Failed to open tsdb env for path %s since %d", path ? path : "NULL", ret); + return -1; + } + + *ppEnv = pEnv; + + return 0; +} + +void tsdbCloseBDBEnv(DB_ENV *pEnv) { + if (pEnv) { + pEnv->close(pEnv, 0); + } +} + +static int tsdbOpenBDBDb(DB **ppDB, DB_ENV *pEnv, const char *pFName, bool isDup) { + int ret; + DB *pDB; + + ret = db_create(&(pDB), pEnv, 0); + if (ret != 0) { + BDB_PERR("Failed to create DBP", ret); + return -1; + } + + if (isDup) { + ret = pDB->set_flags(pDB, DB_DUPSORT); + if (ret != 0) { + BDB_PERR("Failed to set DB flags", ret); + return -1; + } + } + + ret = pDB->open(pDB, NULL, pFName, NULL, DB_BTREE, DB_CREATE, 0); + if (ret) { + BDB_PERR("Failed to open DBF", ret); + return -1; + } + + *ppDB = pDB; + + return 0; +} + +static void tsdbCloseBDBDb(DB *pDB) { + if (pDB) { + pDB->close(pDB, 0); + } +} + +int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *key, uint32_t keySize, void *data, uint32_t dataSize) { + int ret; + DBT key1 = {0}, value1 = {0}; + + key1.data = key; + key1.size = keySize; + + value1.data = data; + value1.size = dataSize; + + // TODO: lock + ret = pDBF->pDB->put(pDBF->pDB, NULL, &key1, &value1, 0); + if (ret) { + BDB_PERR("Failed to put data to DBF", ret); + // TODO: unlock + return -1; + } + // TODO: unlock + + return 0; +} + +void *tsdbGetSmaDataByKey(SDBFile *pDBF, void* key, uint32_t keySize, uint32_t *valueSize) { + void *result = NULL; + DBT key1 = {0}; + DBT value1 = {0}; + int ret; + + // Set key/value + key1.data = key; + key1.size = keySize; + + // Query + // TODO: lock + ret = pDBF->pDB->get(pDBF->pDB, NULL, &key1, &value1, 0); + // TODO: unlock + if (ret != 0) { + return NULL; + } + + result = calloc(1, value1.size); + + if (result == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + *valueSize = value1.size; + memcpy(result, value1.data, value1.size); + + return result; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbMain.c b/source/dnode/vnode/src/tsdb/tsdbMain.c index 0c82911226605487d5297efff59120342c29a85f..afa8921c004aa37282d86ff44f2d1f4850f2dfdc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMain.c +++ b/source/dnode/vnode/src/tsdb/tsdbMain.c @@ -80,6 +80,8 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, pTsdb->pmaf = pMAF; pTsdb->pMeta = pMeta; pTsdb->pTfs = pTfs; + pTsdb->pTSmaEnv = NULL; + pTsdb->pRSmaEnv = NULL; pTsdb->fs = tsdbNewFS(pTsdbCfg); @@ -88,8 +90,9 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, static void tsdbFree(STsdb *pTsdb) { if (pTsdb) { + tsdbFreeSmaEnv(pTsdb->pRSmaEnv); + tsdbFreeSmaEnv(pTsdb->pTSmaEnv); tsdbFreeFS(pTsdb->fs); - tsdbDestroySmaState(pTsdb->pSmaStat); tfree(pTsdb->path); free(pTsdb); } @@ -105,6 +108,30 @@ static void tsdbCloseImpl(STsdb *pTsdb) { tsdbCloseFS(pTsdb); // TODO } + +int tsdbLockRepo(STsdb *pTsdb) { + int code = pthread_mutex_lock(&pTsdb->mutex); + if (code != 0) { + tsdbError("vgId:%d failed to lock tsdb since %s", REPO_ID(pTsdb), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + pTsdb->repoLocked = true; + return 0; +} + +int tsdbUnlockRepo(STsdb *pTsdb) { + ASSERT(IS_REPO_LOCKED(pTsdb)); + pTsdb->repoLocked = false; + int code = pthread_mutex_unlock(&pTsdb->mutex); + if (code != 0) { + tsdbError("vgId:%d failed to unlock tsdb since %s", REPO_ID(pTsdb), strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(code); + return -1; + } + return 0; +} + #if 0 /* * Copyright (c) 2019 TAOS Data, Inc. diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index dc0d262725449e495e827b128092553a8e64d2ca..7335e4f58543d00ded7227d54a1be0606cf81de5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -15,7 +15,9 @@ #include "tsdbDef.h" +#undef SMA_PRINT_DEBUG_LOG #define SMA_STORAGE_TSDB_DAYS 30 +#define SMA_STORAGE_TSDB_TIMES 30 #define SMA_STORAGE_SPLIT_HOURS 24 #define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8 @@ -23,7 +25,7 @@ #define SMA_STATE_ITEM_HASH_SLOT 32 #define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test -#define SMA_TEST_INDEX_UID 123456 // TODO: just for test +#define SMA_TEST_INDEX_UID 2000000001 // TODO: just for test typedef enum { SMA_STORAGE_LEVEL_TSDB = 0, // use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2t200.dat SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/rsma/sma_index_uid/v2r200.dat @@ -31,23 +33,22 @@ typedef enum { typedef struct { STsdb * pTsdb; - char * pDFile; // TODO: use the real DFile type, not char* - int32_t interval; // interval with the precision of DB - // TODO + SDBFile dFile; + int32_t interval; // interval with the precision of DB } STSmaWriteH; typedef struct { int32_t iter; + int32_t fid; } SmaFsIter; typedef struct { STsdb * pTsdb; - char * pDFile; // TODO: use the real DFile type, not char* + SDBFile dFile; int32_t interval; // interval with the precision of DB int32_t blockSize; // size of SMA block item int8_t storageLevel; int8_t days; SmaFsIter smaFsIter; - // TODO } STSmaReadH; typedef struct { @@ -68,18 +69,117 @@ struct SSmaStat { }; // declaration of static functions -static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); -static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); -static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit); -static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData); -static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen); +static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg); +static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg); +// TODO: This is the basic params, and should wrap the params to a queryHandle. +static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, + int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, + int32_t nMaxResult); +static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg); + +static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); +static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); +static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path); +static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv); +static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); +static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH); +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit); +static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit); +static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData); +static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen); static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision); +static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel); static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid); +static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey); +static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey); -static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); -static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin); -static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin); +static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) { + SSmaEnv *pEnv = NULL; + + pEnv = (SSmaEnv *)calloc(1, sizeof(SSmaEnv)); + if (pEnv == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + int code = pthread_rwlock_init(&(pEnv->lock), NULL); + if (code) { + terrno = TAOS_SYSTEM_ERROR(code); + free(pEnv); + return NULL; + } + + ASSERT(path && (strlen(path) > 0)); + pEnv->path = strdup(path); + if (pEnv->path == NULL) { + tsdbFreeSmaEnv(pEnv); + return NULL; + } + + if (tsdbInitSmaStat(&pEnv->pStat) != TSDB_CODE_SUCCESS) { + tsdbFreeSmaEnv(pEnv); + return NULL; + } + + if (tsdbOpenBDBEnv(&pEnv->dbEnv, pEnv->path) != TSDB_CODE_SUCCESS) { + tsdbFreeSmaEnv(pEnv); + return NULL; + } + + return pEnv; +} + +static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) { + if (!pEnv) { + terrno = TSDB_CODE_INVALID_PTR; + return TSDB_CODE_FAILED; + } + + if (pEnv && *pEnv) { + return TSDB_CODE_SUCCESS; + } + + if (tsdbLockRepo(pTsdb) != 0) { + return TSDB_CODE_FAILED; + } + + if (*pEnv == NULL) { + if ((*pEnv = tsdbNewSmaEnv(pTsdb, path)) == NULL) { + tsdbUnlockRepo(pTsdb); + return TSDB_CODE_FAILED; + } + } + + if (tsdbUnlockRepo(pTsdb) != 0) { + tsdbFreeSmaEnv(*pEnv); + return TSDB_CODE_FAILED; + } + + return TSDB_CODE_SUCCESS; +} + +/** + * @brief Release resources allocated for its member fields, not including itself. + * + * @param pSmaEnv + * @return int32_t + */ +void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv) { + if (pSmaEnv) { + tsdbDestroySmaState(pSmaEnv->pStat); + tfree(pSmaEnv->pStat); + tfree(pSmaEnv->path); + pthread_rwlock_destroy(&(pSmaEnv->lock)); + tsdbCloseBDBEnv(pSmaEnv->dbEnv); + } +} + +void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv) { + tsdbDestroySmaEnv(pSmaEnv); + tfree(pSmaEnv); + return NULL; +} static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) { ASSERT(pSmaStat != NULL); @@ -125,6 +225,12 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) { return pItem; } +/** + * @brief Release resources allocated for its member fields, not including itself. + * + * @param pSmaStat + * @return int32_t + */ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) { if (pSmaStat) { // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. @@ -135,7 +241,6 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) { item = taosHashIterate(pSmaStat->smaStatItems, item); } taosHashCleanup(pSmaStat->smaStatItems); - free(pSmaStat); } } @@ -143,22 +248,35 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) { * @brief Update expired window according to msg from stream computing module. * * @param pTsdb + * @param smaType ETsdbSmaType * @param msg * @return int32_t */ -int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { - if (msg == NULL) { +int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) { + STsdbCfg *pCfg = REPO_CFG(pTsdb); + SSmaEnv * pEnv = NULL; + + if (!msg || !pTsdb->pMeta) { + terrno = TSDB_CODE_INVALID_PTR; return TSDB_CODE_FAILED; } - // lazy mode - if (tsdbInitSmaStat(&pTsdb->pSmaStat) != TSDB_CODE_SUCCESS) { + char smaPath[TSDB_FILENAME_LEN] = "/proj/.sma/"; + if (tsdbInitSmaEnv(pTsdb, smaPath, &pEnv) != TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; } + if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { + pTsdb->pTSmaEnv = pEnv; + } else if (smaType == TSDB_SMA_TYPE_ROLLUP) { + pTsdb->pRSmaEnv = pEnv; + } else { + ASSERT(0); + } + // TODO: decode the msg => start - int64_t indexUid = SMA_TEST_INDEX_UID; - const char * indexName = SMA_TEST_INDEX_NAME; + int64_t indexUid = SMA_TEST_INDEX_UID; + // const char * indexName = SMA_TEST_INDEX_NAME; const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10; TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE]; int64_t now = taosGetTimestampMs(); @@ -167,9 +285,9 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { } // TODO: decode the msg <= end - SHashObj *pItemsHash = pTsdb->pSmaStat->smaStatItems; + SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv); - SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, indexName, strlen(indexName)); + SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); if (pItem == NULL) { pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state if (pItem == NULL) { @@ -181,20 +299,28 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { // cache smaMeta STSma *pSma = metaGetSmaInfoByIndex(pTsdb->pMeta, indexUid); if (pSma == NULL) { + terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META; taosHashCleanup(pItem->expiredWindows); free(pItem); + tsdbWarn("vgId:%d update expired window failed for smaIndex %" PRIi64 " since %s", REPO_ID(pTsdb), indexUid, + tstrerror(terrno)); return TSDB_CODE_FAILED; } pItem->pSma = pSma; // TODO: change indexName to indexUid - if (taosHashPut(pItemsHash, indexName, strnlen(indexName, TSDB_INDEX_NAME_LEN), &pItem, sizeof(pItem)) != 0) { + if (taosHashPut(pItemsHash, &indexUid, sizeof(indexUid), &pItem, sizeof(pItem)) != 0) { // If error occurs during put smaStatItem, free the resources of pItem taosHashCleanup(pItem->expiredWindows); free(pItem); return TSDB_CODE_FAILED; } } +#if 0 + SSmaStatItem *pItem1 = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); + int size1 = taosHashGetSize(pItem1->expiredWindows); + tsdbWarn("vgId:%d smaIndex %" PRIi64 " size is %d before hashPut", REPO_ID(pTsdb), indexUid, size1); +#endif int8_t state = TSDB_SMA_STAT_EXPIRED; for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) { @@ -207,21 +333,28 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { // windows failed to put into hash table. taosHashCleanup(pItem->expiredWindows); tfree(pItem->pSma); - taosHashRemove(pItemsHash, indexName, sizeof(indexName)); + taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid)); return TSDB_CODE_FAILED; } } +#if 0 + SSmaStatItem *pItem2 = (SSmaStatItem *)taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); + int size2 = taosHashGetSize(pItem1->expiredWindows); + tsdbWarn("vgId:%d smaIndex %" PRIi64 " size is %d after hashPut", REPO_ID(pTsdb), indexUid, size2); +#endif + return TSDB_CODE_SUCCESS; } -static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, int64_t indexUid, TSKEY skey) { +static int32_t tsdbResetExpiredWindow(SSmaStat *pStat, int64_t indexUid, TSKEY skey) { SSmaStatItem *pItem = NULL; - if (pTsdb->pSmaStat && pTsdb->pSmaStat->smaStatItems) { - pItem = (SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, &indexUid, sizeof(indexUid)); + // TODO: If HASH_ENTRY_LOCK used, whether rwlock needed to handle cases of removing hashNode? + if (pStat && pStat->smaStatItems) { + pItem = (SSmaStatItem *)taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); } - +#if 0 if (pItem != NULL) { // TODO: reset time window for the sma data blocks if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) { @@ -231,6 +364,7 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, int64_t indexUid, TSKEY skey } else { // error handling } +#endif return TSDB_CODE_SUCCESS; } @@ -241,7 +375,7 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, int64_t indexUid, TSKEY skey * @param intervalUnit * @return int32_t */ -static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit) { +static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) { // TODO: configurable for SMA_STORAGE_SPLIT_HOURS? switch (intervalUnit) { case TD_TIME_UNIT_HOUR: @@ -281,18 +415,35 @@ static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit) { } /** - * @brief Insert TSma data blocks to B+Tree + * @brief Insert TSma data blocks to DB File build by B+Tree * - * @param bTree + * @param pSmaH * @param smaKey + * @param keyLen * @param pData * @param dataLen * @return int32_t */ -static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen) { +static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen) { + SDBFile *pDBFile = &pSmaH->dFile; + // TODO: insert sma data blocks into B+Tree - tsdbDebug("insert sma data blocks into B+Tree: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d", - *(uint64_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen); + tsdbDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d", + REPO_ID(pSmaH->pTsdb), pDBFile->path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), + *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen); + + if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen) != 0) { + return TSDB_CODE_FAILED; + } + +#ifdef SMA_PRINT_DEBUG_LOG + uint32_t valueSize = 0; + void * data = tsdbGetSmaDataByKey(pDBFile, smaKey, keyLen, &valueSize); + ASSERT(data != NULL); + for (uint32_t v = 0; v < valueSize; v += 8) { + tsdbWarn("vgId:%d sma data - val[%d] is %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v)); + } +#endif return TSDB_CODE_SUCCESS; } @@ -324,41 +475,41 @@ static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit } } - switch (intervalUnit) { - case TD_TIME_UNIT_MILLISEC: - if (TSDB_TIME_PRECISION_MILLI == precision) { + switch (precision) { + case TSDB_TIME_PRECISION_MILLI: + if (TD_TIME_UNIT_MICROSEC == intervalUnit) { // us + return interval / 1e3; + } else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second + return interval / 1e6; + } else { return interval; - } else if (TSDB_TIME_PRECISION_MICRO == precision) { - return interval * 1e3; - } else { // nano second - return interval * 1e6; } break; - case TD_TIME_UNIT_MICROSEC: - if (TSDB_TIME_PRECISION_MILLI == precision) { - return interval / 1e3; - } else if (TSDB_TIME_PRECISION_MICRO == precision) { + case TSDB_TIME_PRECISION_MICRO: + if (TD_TIME_UNIT_MICROSEC == intervalUnit) { // us return interval; - } else { // nano second + } else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second + return interval / 1e3; + } else { return interval * 1e3; } break; - case TD_TIME_UNIT_NANOSEC: - if (TSDB_TIME_PRECISION_MILLI == precision) { - return interval / 1e6; - } else if (TSDB_TIME_PRECISION_MICRO == precision) { - return interval / 1e3; - } else { // nano second + case TSDB_TIME_PRECISION_NANO: + if (TD_TIME_UNIT_MICROSEC == intervalUnit) { + return interval * 1e3; + } else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second return interval; + } else { + return interval * 1e6; } break; - default: - if (TSDB_TIME_PRECISION_MILLI == precision) { - return interval * 1e3; - } else if (TSDB_TIME_PRECISION_MICRO == precision) { - return interval * 1e6; - } else { // nano second - return interval * 1e9; + default: // ms + if (TD_TIME_UNIT_MICROSEC == intervalUnit) { // us + return interval / 1e3; + } else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second + return interval / 1e6; + } else { + return interval; } break; } @@ -381,8 +532,6 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p // TODO: check the data integrity - void *bTree = pSmaH->pDFile; - int32_t len = 0; while (true) { if (len >= pData->dataLen) { @@ -405,7 +554,7 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p pData->indexUid, pData->skey, pTbData->tableUid, pColData->colId); #endif tsdbEncodeTSmaKey(pTbData->tableUid, pColData->colId, pData->skey, (void **)&pSmaKey); - if (tsdbInsertTSmaBlocks(bTree, smaKey, pColData->data, pColData->blockSize) < 0) { + if (tsdbInsertTSmaBlocks(pSmaH, smaKey, SMA_KEY_LEN, pColData->data, pColData->blockSize) < 0) { tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); } tbLen += (sizeof(STSmaColData) + pColData->blockSize); @@ -419,15 +568,43 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) { pSmaH->pTsdb = pTsdb; pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision); + return TSDB_CODE_SUCCESS; } -static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) { - // TODO - pSmaH->pDFile = "tSma_interval_file_name"; +static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) { + if (pSmaH) { + tsdbCloseDBF(&pSmaH->dFile); + } +} +static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) { + STsdb *pTsdb = pSmaH->pTsdb; + ASSERT(pSmaH->dFile.path == NULL && pSmaH->dFile.pDB == NULL); + char tSmaFile[TSDB_FILENAME_LEN] = {0}; + snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pTsdb), fid); + pSmaH->dFile.path = strdup(tSmaFile); return TSDB_CODE_SUCCESS; -} +} +/** + * @brief + * + * @param pTsdb + * @param interval Interval calculated by DB's precision + * @param storageLevel + * @return int32_t + */ +static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel) { + STsdbCfg *pCfg = REPO_CFG(pTsdb); + int32_t daysPerFile = pCfg->daysPerFile; + + if (storageLevel == SMA_STORAGE_LEVEL_TSDB) { + int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerDay[pCfg->precision]); + daysPerFile = days > SMA_STORAGE_TSDB_DAYS ? days : SMA_STORAGE_TSDB_DAYS; + } + + return daysPerFile; +} /** * @brief Insert/Update Time-range-wise SMA data. @@ -441,51 +618,72 @@ static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, * @param msg * @return int32_t */ -int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { - STsdbCfg * pCfg = REPO_CFG(pTsdb); +static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { + STsdbCfg * pCfg = REPO_CFG(pTsdb); STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; - STSmaWriteH tSmaH = {0}; - tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData); + if (!pTsdb->pTSmaEnv) { + terrno = TSDB_CODE_INVALID_PTR; + tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb)); + return terrno; + } if (pData->dataLen <= 0) { TASSERT(0); terrno = TSDB_CODE_INVALID_PARA; - return terrno; + return TSDB_CODE_FAILED; } - // Step 1: Judge the storage level - int32_t storageLevel = tsdbJudgeStorageLevel(pData->interval, pData->intervalUnit); - int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile; + STSmaWriteH tSmaH = {0}; - // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file - // - Set and open the DFile or the B+Tree file + if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData) != 0) { + return TSDB_CODE_FAILED; + } + // Step 1: Judge the storage level and days + int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit); + int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel); int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision)); - // Save all the TSma data to one file + // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file + // - Set and open the DFile or the B+Tree file // TODO: tsdbStartTSmaCommit(); tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid); - tsdbInsertTSmaDataSection(&tSmaH, pData); + if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) { + tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb), + tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno)); + tsdbDestroyTSmaWriteH(&tSmaH); + return TSDB_CODE_FAILED; + } + + if (tsdbInsertTSmaDataSection(&tSmaH, pData) != 0) { + tsdbWarn("vgId:%d insert tSma data section failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + tsdbDestroyTSmaWriteH(&tSmaH); + return TSDB_CODE_FAILED; + } // TODO:tsdbEndTSmaCommit(); - // reset the SSmaStat - tsdbResetExpiredWindow(pTsdb, pData->indexUid, pData->skey); + // Step 3: reset the SSmaStat + tsdbResetExpiredWindow(SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey); + tsdbDestroyTSmaWriteH(&tSmaH); return TSDB_CODE_SUCCESS; } static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) { - // TODO - pSmaH->pDFile = "rSma_interval_file_name"; + STsdb *pTsdb = pSmaH->pTsdb; + + char tSmaFile[TSDB_FILENAME_LEN] = {0}; + snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.rsma", REPO_ID(pTsdb), fid); + pSmaH->dFile.path = strdup(tSmaFile); return TSDB_CODE_SUCCESS; } -int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { - STsdbCfg * pCfg = REPO_CFG(pTsdb); +static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { + STsdbCfg * pCfg = REPO_CFG(pTsdb); STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; - STSmaWriteH tSmaH = {0}; + STSmaWriteH tSmaH = {0}; tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData); @@ -496,7 +694,7 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { } // Step 1: Judge the storage level - int32_t storageLevel = tsdbJudgeStorageLevel(pData->interval, pData->intervalUnit); + int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit); int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile; // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file @@ -507,45 +705,46 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { // Save all the TSma data to one file // TODO: tsdbStartTSmaCommit(); tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid); + tsdbInsertTSmaDataSection(&tSmaH, pData); // TODO:tsdbEndTSmaCommit(); // reset the SSmaStat - tsdbResetExpiredWindow(pTsdb, pData->indexUid, pData->skey); + tsdbResetExpiredWindow(SMA_ENV_STAT(pTsdb->pRSmaEnv), pData->indexUid, pData->skey); return TSDB_CODE_SUCCESS; } /** - * @brief Init of tSma ReadH + * @brief * * @param pSmaH * @param pTsdb - * @param param - * @param pData + * @param interval + * @param intervalUnit * @return int32_t */ -static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) { +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit) { pSmaH->pTsdb = pTsdb; - pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision); - // pSmaH->blockSize = param->numOfFuncIds * sizeof(int64_t); + pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision); + pSmaH->storageLevel = tsdbGetSmaStorageLevel(interval, intervalUnit); + pSmaH->days = tsdbGetTSmaDays(pTsdb, pSmaH->interval, pSmaH->storageLevel); } /** * @brief Init of tSma FS * * @param pReadH - * @param param - * @param queryWin + * @param skey * @return int32_t */ -static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) { - int32_t storageLevel = 0; //tsdbJudgeStorageLevel(param->interval, param->intervalUnit); - int32_t daysPerFile = - storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : REPO_CFG(pReadH->pTsdb)->daysPerFile; - pReadH->storageLevel = storageLevel; - pReadH->days = daysPerFile; - pReadH->smaFsIter.iter = 0; +static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey) { + int32_t fid = (int32_t)(TSDB_KEY_FID(skey, pSmaH->days, REPO_CFG(pSmaH->pTsdb)->precision)); + char tSmaFile[TSDB_FILENAME_LEN] = {0}; + snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pSmaH->pTsdb), fid); + pSmaH->dFile.path = strdup(tSmaFile); + pSmaH->smaFsIter.iter = 0; + pSmaH->smaFsIter.fid = fid; } /** @@ -557,17 +756,18 @@ static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) { * @return true * @return false */ -static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) { +static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) { SArray *smaFs = pReadH->pTsdb->fs->cstatus->sf; int32_t nSmaFs = taosArrayGetSize(smaFs); - pReadH->pDFile = NULL; + tsdbCloseDBF(&pReadH->dFile); +#if 0 while (pReadH->smaFsIter.iter < nSmaFs) { void *pSmaFile = taosArrayGet(smaFs, pReadH->smaFsIter.iter); if (pSmaFile) { // match(indexName, queryWindow) // TODO: select the file by index_name ... - pReadH->pDFile = pSmaFile; + pReadH->dFile = pSmaFile; ++pReadH->smaFsIter.iter; break; } @@ -578,41 +778,83 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) { tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]"); return true; } +#endif return false; } /** - * @brief Return the data between queryWin and fill the pData. + * @brief * - * @param pTsdb - * @param param + * @param pTsdb Return the data between queryWin and fill the pData. * @param pData - * @param queryWin + * @param indexUid + * @param interval + * @param intervalUnit + * @param tableUid + * @param colId + * @param pQuerySKey * @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM. * @return int32_t */ -int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult) { - SSmaStatItem *pItem = - (SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, &pData->indexUid, sizeof(pData->indexUid)); +static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, + int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, + int32_t nMaxResult) { + SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid)); if (pItem == NULL) { // mark all window as expired and notify query module to query raw TS data. return TSDB_CODE_SUCCESS; } - int32_t nQueryWin = 0; +#if 0 + int32_t nQueryWin = taosArrayGetSize(pQuerySKey); for (int32_t n = 0; n < nQueryWin; ++n) { - TSKEY thisWindow = n; - if (taosHashGet(pItem->expiredWindows, &thisWindow, sizeof(thisWindow)) != NULL) { + TSKEY skey = taosArrayGet(pQuerySKey, n); + if (taosHashGet(pItem->expiredWindows, &skey, sizeof(TSKEY)) != NULL) { // TODO: mark this window as expired. } } - +#endif +#if 0 + if (taosHashGet(pItem->expiredWindows, &querySkey, sizeof(TSKEY)) != NULL) { + // TODO: mark this window as expired. + } +#endif STSmaReadH tReadH = {0}; - tsdbInitTSmaReadH(&tReadH, pTsdb, pData); + tsdbInitTSmaReadH(&tReadH, pTsdb, interval, intervalUnit); + tsdbCloseDBF(&tReadH.dFile); - tsdbInitTSmaFile(&tReadH, queryWin); + tsdbInitTSmaFile(&tReadH, querySkey); + if (tsdbOpenDBF(SMA_ENV_ENV(pTsdb->pTSmaEnv), &tReadH.dFile) != 0) { + tsdbWarn("vgId:%d open DBF %s failed since %s", REPO_ID(pTsdb), tReadH.dFile.path, tstrerror(terrno)); + return TSDB_CODE_FAILED; + } + char smaKey[SMA_KEY_LEN] = {0}; + void *pSmaKey = &smaKey; + tsdbEncodeTSmaKey(tableUid, colId, querySkey, (void **)&pSmaKey); + + tsdbDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", keyLen %d", REPO_ID(pTsdb), + tReadH.dFile.path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), + *(int64_t *)POINTER_SHIFT(smaKey, 10), SMA_KEY_LEN); + + void * result = NULL; + uint32_t valueSize = 0; + if ((result = tsdbGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize)) == NULL) { + tsdbWarn("vgId:%d get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 + " since %s", + REPO_ID(pTsdb), indexUid, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), + *(int64_t *)POINTER_SHIFT(smaKey, 10), tstrerror(terrno)); + tsdbCloseDBF(&tReadH.dFile); + return TSDB_CODE_FAILED; + } + tfree(result); +#ifdef SMA_PRINT_DEBUG_LOG + for (uint32_t v = 0; v < valueSize; v += 8) { + tsdbWarn("vgId:%d v[%d]=%" PRIi64, REPO_ID(pTsdb), v, *(int64_t *)POINTER_SHIFT(result, v)); + } +#endif +#if 0 int32_t nResult = 0; int64_t lastKey = 0; @@ -634,8 +876,9 @@ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow * } } } - +#endif // read data from file and fill the result + tsdbCloseDBF(&tReadH.dFile); return TSDB_CODE_SUCCESS; } @@ -673,4 +916,55 @@ int32_t tsdbRemoveTSmaData(STsdb *pTsdb, void *smaIndex, STimeWindow *pWin) { // } return TSDB_CODE_SUCCESS; } -#endif \ No newline at end of file +#endif + +/** + * @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine + * + * @param pTsdb + * @param param + * @param msg + * @return int32_t + * TODO: Who is responsible for resource allocate and release? + */ +int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tsdbInsertTSmaDataImpl(pTsdb, msg)) < 0) { + tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return code; +} + +int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tsdbUpdateExpiredWindow(pTsdb, smaType, msg)) < 0) { + tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return code; +} + +/** + * @brief Insert Time-range-wise Rollup Sma(RSma) data + * + * @param pTsdb + * @param param + * @param msg + * @return int32_t + */ +int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tsdbInsertRSmaDataImpl(pTsdb, msg)) < 0) { + tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return code; +} + +int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit, + tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, int32_t nMaxResult) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tsdbGetTSmaDataImpl(pTsdb, pData, indexUid, interval, intervalUnit, tableUid, colId, querySkey, + nMaxResult)) < 0) { + tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return code; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 26d31af4f32541f8e9cd0a161929ca8794e2a2c2..3ccb483fe48c459b52dcd783ec62ee45ad9e4112 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -34,6 +34,7 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) { return tsdbMemTableInsert(pTsdb, pTsdb->mem, pMsg, NULL); } +#if 0 /** * @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine * @@ -51,6 +52,14 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) { return code; } +int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tsdbUpdateExpiredWindow(pTsdb, smaType, msg)) < 0) { + tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return code; +} + /** * @brief Insert Time-range-wise Rollup Sma(RSma) data * @@ -65,4 +74,6 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) { tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); } return code; -} \ No newline at end of file +} + +#endif \ No newline at end of file diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index ac9a8fd3d040776c978a2c4def4732af1b669a7b..18dca33bdaf73e4ebcd95a1623960514b26667a3 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -33,7 +33,7 @@ int main(int argc, char **argv) { return RUN_ALL_TESTS(); } -TEST(testCase, tSmaEncodeDecodeTest) { +TEST(testCase, tSma_Meta_Encode_Decode_Test) { // encode STSma tSma = {0}; tSma.version = 0; @@ -87,8 +87,9 @@ TEST(testCase, tSmaEncodeDecodeTest) { tdDestroyTSma(&tSma); tdDestroyTSmaWrapper(&dstTSmaWrapper); } + #if 1 -TEST(testCase, tSma_DB_Put_Get_Del_Test) { +TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { const char * smaIndexName1 = "sma_index_test_1"; const char * smaIndexName2 = "sma_index_test_2"; const char * timezone = "Asia/Shanghai"; @@ -220,13 +221,84 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { #endif #if 1 -TEST(testCase, tSmaInsertTest) { - const int64_t indexUid = 2000000002; +TEST(testCase, tSma_Data_Insert_Query_Test) { + // step 1: prepare meta + const char * smaIndexName1 = "sma_index_test_1"; + const char * timezone = "Asia/Shanghai"; + const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;"; + const char * tagsFilter = "where tags.location='Beijing' and tags.district='ChaoYang'"; + const char * smaTestDir = "./smaTest"; + const tb_uid_t tbUid = 1234567890; + const int64_t indexUid1 = 2000000001; + const int64_t interval1 = 1; + const int8_t intervalUnit1 = TD_TIME_UNIT_DAY; + const uint32_t nCntTSma = 2; + TSKEY skey1 = 1646987196; + const int64_t testSmaData1 = 100; + const int64_t testSmaData2 = 200; + // encode + STSma tSma = {0}; + tSma.version = 0; + tSma.intervalUnit = TD_TIME_UNIT_DAY; + tSma.interval = 1; + tSma.slidingUnit = TD_TIME_UNIT_HOUR; + tSma.sliding = 0; + tSma.indexUid = indexUid1; + tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN); + tstrncpy(tSma.timezone, timezone, TD_TIMEZONE_LEN); + tSma.tableUid = tbUid; + + tSma.exprLen = strlen(expr); + tSma.expr = (char *)calloc(tSma.exprLen + 1, 1); + tstrncpy(tSma.expr, expr, tSma.exprLen + 1); + + tSma.tagsFilterLen = strlen(tagsFilter); + tSma.tagsFilter = (char *)calloc(tSma.tagsFilterLen + 1, 1); + tstrncpy(tSma.tagsFilter, tagsFilter, tSma.tagsFilterLen + 1); + + SMeta * pMeta = NULL; + STSma * pSmaCfg = &tSma; + const SMetaCfg *pMetaCfg = &defaultMetaOptions; + + taosRemoveDir(smaTestDir); + + pMeta = metaOpen(smaTestDir, pMetaCfg, NULL); + assert(pMeta != NULL); + // save index 1 + EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0); + + // step 2: insert data STSmaDataWrapper *pSmaData = NULL; STsdb tsdb = {0}; STsdbCfg * pCfg = &tsdb.config; - pCfg->daysPerFile = 1; + tsdb.pMeta = pMeta; + tsdb.vgId = 2; + tsdb.config.daysPerFile = 10; // default days is 10 + tsdb.config.keep1 = 30; + tsdb.config.keep2 = 90; + tsdb.config.keep = 365; + tsdb.config.precision = TSDB_TIME_PRECISION_MILLI; + tsdb.config.update = TD_ROW_OVERWRITE_UPDATE; + tsdb.config.compression = TWO_STAGE_COMP; + + switch (tsdb.config.precision) { + case TSDB_TIME_PRECISION_MILLI: + skey1 *= 1e3; + break; + case TSDB_TIME_PRECISION_MICRO: + skey1 *= 1e6; + break; + case TSDB_TIME_PRECISION_NANO: + skey1 *= 1e9; + break; + default: // ms + skey1 *= 1e3; + break; + } + + char *msg = (char *)calloc(100, 1); + EXPECT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0); // init int32_t allocCnt = 0; @@ -235,21 +307,21 @@ TEST(testCase, tSmaInsertTest) { void * buf = NULL; EXPECT_EQ(tsdbMakeRoom(&buf, allocStep), 0); int32_t bufSize = taosTSizeof(buf); - int32_t numOfTables = 25; + int32_t numOfTables = 10; col_id_t numOfCols = 4096; EXPECT_GT(numOfCols, 0); pSmaData = (STSmaDataWrapper *)buf; printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData); - pSmaData->skey = 1646987196; - pSmaData->interval = 10; - pSmaData->intervalUnit = TD_TIME_UNIT_MINUTE; - pSmaData->indexUid = indexUid; + pSmaData->skey = skey1; + pSmaData->interval = interval1; + pSmaData->intervalUnit = intervalUnit1; + pSmaData->indexUid = indexUid1; int32_t len = sizeof(STSmaDataWrapper); for (int32_t t = 0; t < numOfTables; ++t) { STSmaTbData *pTbData = (STSmaTbData *)POINTER_SHIFT(pSmaData, len); - pTbData->tableUid = t; + pTbData->tableUid = tbUid + t; int32_t tableDataLen = sizeof(STSmaTbData); for (col_id_t c = 0; c < numOfCols; ++c) { @@ -262,8 +334,17 @@ TEST(testCase, tSmaInsertTest) { } STSmaColData *pColData = (STSmaColData *)POINTER_SHIFT(pSmaData, len + tableDataLen); pColData->colId = c + PRIMARYKEY_TIMESTAMP_COL_ID; - pColData->blockSize = ((c & 1) == 0) ? 8 : 16; + // TODO: fill col data + if ((c & 1) == 0) { + pColData->blockSize = 8; + memcpy(pColData->data, &testSmaData1, 8); + } else { + pColData->blockSize = 16; + memcpy(pColData->data, &testSmaData1, 8); + memcpy(POINTER_SHIFT(pColData->data, 8), &testSmaData2, 8); + } + tableDataLen += (sizeof(STSmaColData) + pColData->blockSize); } pTbData->dataLen = (tableDataLen - sizeof(STSmaTbData)); @@ -277,8 +358,24 @@ TEST(testCase, tSmaInsertTest) { // execute EXPECT_EQ(tsdbInsertTSmaData(&tsdb, (char *)pSmaData), TSDB_CODE_SUCCESS); - // release + // step 3: query + uint32_t checkDataCnt = 0; + for (int32_t t = 0; t < numOfTables; ++t) { + for (col_id_t c = 0; c < numOfCols; ++c) { + EXPECT_EQ(tsdbGetTSmaData(&tsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t, + c + PRIMARYKEY_TIMESTAMP_COL_ID, skey1, 1), + TSDB_CODE_SUCCESS); + ++checkDataCnt; + } + } + + printf("%s:%d The sma data check count for insert and query is %" PRIu32 "\n", __FILE__, __LINE__, checkDataCnt); + + // release data taosTZfree(buf); + // release meta + tdDestroyTSma(&tSma); + metaClose(pMeta); } #endif diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c1cb4f8a410665fae27b4a24848c4de8ff3a7f30..f97df62ccc380773b10c4e63b2e6814ad636b531 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -349,6 +349,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, "No available disk") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data") +TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta") // query TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")