diff --git a/include/common/taosdef.h b/include/common/taosdef.h index 69c2618ac88935e37427b97fae36a4df564f73cf..7788dfd871e3ab9ab12965d1ddf169f1480e0f38 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -50,12 +50,17 @@ typedef enum { TSDB_CHECK_ITEM_MAX } ECheckItemType; -typedef enum { TD_ROW_DISCARD_UPDATE = 0, TD_ROW_OVERWRITE_UPDATE = 1, TD_ROW_PARTIAL_UPDATE = 2 } TDUpdateConfig; +typedef enum { TD_ROW_DISCARD_UPDATE = 0, TD_ROW_OVERWRITE_UPDATE = 1, TD_ROW_PARTIAL_UPDATE = 2} TDUpdateConfig; typedef enum { TSDB_STATIS_OK = 0, // statis part exist and load successfully TSDB_STATIS_NONE = 1, // statis part not exist } ETsdbStatisStatus; +typedef enum { + TSDB_SMA_STAT_OK = 0, // ready to provide service + TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired +} ETsdbSmaStat; + extern char *qtypeStr[]; #ifdef __cplusplus diff --git a/source/dnode/vnode/src/inc/tsdbDef.h b/source/dnode/vnode/src/inc/tsdbDef.h index 96a76ea7d4a0c7bd24b587afa7f98129f0855ac4..1451ac9685ef108cf100358e3be07150aab5db05 100644 --- a/source/dnode/vnode/src/inc/tsdbDef.h +++ b/source/dnode/vnode/src/inc/tsdbDef.h @@ -52,6 +52,7 @@ struct STsdb { STsdbFS * fs; SMeta * pMeta; STfs * pTfs; + SSmaStat * pSmaStat; }; #define REPO_ID(r) ((r)->vgId) diff --git a/source/dnode/vnode/src/inc/tsdbFS.h b/source/dnode/vnode/src/inc/tsdbFS.h index 641255a294f36da5d8b437662066d7b99ff57f66..173e99163175bf400863240dc0760f32ecbaa5ac 100644 --- a/source/dnode/vnode/src/inc/tsdbFS.h +++ b/source/dnode/vnode/src/inc/tsdbFS.h @@ -42,7 +42,10 @@ typedef struct { typedef struct { STsdbFSMeta meta; // FS meta SArray * df; // data file array - SArray * smaf; // sma data file array + + // SArray * v2f100.tsma.index_name + + SArray * smaf; // sma data file array v2f1900.tsma.index_name } SFSStatus; typedef struct { diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h index e4de7a668558c66aa1db07b9eddd4624809ab188..6e4ad909ae4950ebcef51a78cde9c5a54bfc2c57 100644 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -16,6 +16,8 @@ #ifndef _TD_TSDB_SMA_H_ #define _TD_TSDB_SMA_H_ +typedef struct SSmaStat SSmaStat; + // insert/update interface int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData); int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData); @@ -26,13 +28,14 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData); int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult); // management interface -int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void* result); +int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg); +int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result); int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin); +int32_t tsdbFreeSmaState(SSmaStat *pSmaStat); +// internal func - -// internal func static FORCE_INLINE int32_t tsdbEncodeTSmaKey(uint64_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) { int32_t len = 0; len += taosEncodeFixedU64(pData, tableUid); diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index f49515412b28bf9ff20f2a6eeda3a4381473fa37..efdb3e0fe4cf42af0e5ca611f5687f18c5ad5216 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -923,6 +923,7 @@ SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) { SMetaDB *pDB = pMeta->pDB; DBC * pCur = NULL; DBT pkey = {0}, pval = {0}; + uint32_t mode = isDup ? DB_NEXT_DUP : DB_NEXT_NODUP; int ret; pUids = taosArrayInit(16, sizeof(tb_uid_t)); @@ -941,13 +942,8 @@ SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) { void *pBuf = NULL; // TODO: lock? - while (true) { - ret = pCur->get(pCur, &pkey, &pval, isDup ? DB_NEXT_DUP : DB_NEXT_NODUP); - if(ret == 0) { + while ((ret = pCur->get(pCur, &pkey, &pval, mode)) == 0) { taosArrayPush(pUids, pkey.data); - continue; - } - break; } if (pCur) { diff --git a/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c b/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c new file mode 100644 index 0000000000000000000000000000000000000000..f2f48bbc8a69a022d0fc6b8a88c5a9a55d0b4ad6 --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c @@ -0,0 +1,14 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ diff --git a/source/dnode/vnode/src/tsdb/tsdbMain.c b/source/dnode/vnode/src/tsdb/tsdbMain.c index 2d8c4701132a9c9818775654df8751b5b4a2ec75..1b3e00f090f1728864cc1654ae9cc41f675e789e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMain.c +++ b/source/dnode/vnode/src/tsdb/tsdbMain.c @@ -89,6 +89,7 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, static void tsdbFree(STsdb *pTsdb) { if (pTsdb) { tsdbFreeFS(pTsdb->fs); + tsdbFreeSmaState(pTsdb->pSmaStat); tfree(pTsdb->path); free(pTsdb); } diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index b465dc3a88d2e56f716cf595a391c0f0985372c4..5cfa597eb2375a4b34f22c9c7684c95a93ac9bc3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -21,6 +21,10 @@ #define SMA_STORE_SINGLE_BLOCKS // store SMA data by single block or multiple blocks +#define SMA_STATE_HASH_SLOT 4 +#define SMA_STATE_ITEM_HASH_SLOT 32 + +#define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test typedef enum { SMA_STORAGE_LEVEL_TSDB = 0, // store TSma in dir e.g. vnode${N}/tsdb/.tsma SMA_STORAGE_LEVEL_DFILESET = 1 // store TSma in file e.g. vnode${N}/tsdb/v2f1900.tsma.${sma_index_name} @@ -48,6 +52,22 @@ typedef struct { // TODO } STSmaReadH; +typedef struct { + /** + * @brief The field 'state' is here to demonstrate if one smaIndex is ready to provide service. + * - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open, + * without information about its previous state. + * - TSDB_SMA_STAT_OK: 1) The sma calculation of history data is finished; 2) Or recevied information from + * Streaming Module or TSDB local persistence. + */ + int8_t state; // ETsdbSmaStat + SHashObj *expiredWindows; // key: skey of time window, value: N/A +} SSmaStatItem; + +struct SSmaStat { + SHashObj *smaStatItems; // key: indexName, value: SSmaStatItem +}; + // declaration of static functions static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); @@ -64,6 +84,114 @@ static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin); static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin); +static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) { + ASSERT(pSmaStat != NULL); + // TODO: lock and create when each put, or create during tsdbNew. + if (*pSmaStat == NULL) { + *pSmaStat = (SSmaStat *)calloc(1, sizeof(SSmaStat)); + if (*pSmaStat == NULL) { + // TODO: unlock + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + + (*pSmaStat)->smaStatItems = + taosHashInit(SMA_STATE_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + + if ((*pSmaStat)->smaStatItems == NULL) { + tfree(*pSmaStat); + // TODO: unlock + return TSDB_CODE_FAILED; + } + } + // TODO: unlock + return TSDB_CODE_SUCCESS; +} + +static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) { + SSmaStatItem *pItem = NULL; + + pItem = (SSmaStatItem *)calloc(1, sizeof(SSmaStatItem)); + if (pItem) { + pItem->state = state; + pItem->expiredWindows = taosHashInit(SMA_STATE_ITEM_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP), + true, HASH_ENTRY_LOCK); + if (!pItem->expiredWindows) { + tfree(pItem); + } + } + return pItem; +} + +int32_t tsdbFreeSmaState(SSmaStat *pSmaStat) { + if (pSmaStat) { + // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. + SSmaStatItem *item = taosHashIterate(pSmaStat->smaStatItems, NULL); + while (item != NULL) { + taosHashCleanup(item->expiredWindows); + item = taosHashIterate(pSmaStat->smaStatItems, item); + } + + taosHashCleanup(pSmaStat->smaStatItems); + free(pSmaStat); + } +} + +/** + * @brief Update expired window according to msg from stream computing module. + * + * @param pTsdb + * @param msg + * @return int32_t + */ +int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { + if (msg == NULL) { + return TSDB_CODE_FAILED; + } + + // TODO: decode the msg => start + const char * indexName = SMA_TEST_INDEX_NAME; + const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10; + TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE]; + int64_t now = taosGetTimestampMs(); + for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) { + expiredWindows[i] = now + i; + } + // TODO: decode the msg <= end + + SHashObj *pItemsHash = pTsdb->pSmaStat->smaStatItems; + + SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, indexName, strlen(indexName)); + if (!pItem) { + pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state + if (!pItem) { + // Response to stream computing: OOM + // For query, if the indexName not found, the TSDB should tell query module to query raw TS data. + return TSDB_CODE_FAILED; + } + + if (taosHashPut(pItemsHash, indexName, strnlen(indexName, TSDB_INDEX_NAME_LEN), &pItem, sizeof(pItem)) != 0) { + // If error occurs during put smaStatItem, free the resources of pItem + taosHashCleanup(pItem->expiredWindows); + free(pItem); + return TSDB_CODE_FAILED; + } + } + + int8_t state = TSDB_SMA_STAT_EXPIRED; + for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) { + if (taosHashPut(pItem->expiredWindows, &expiredWindows[i], sizeof(TSKEY), &state, sizeof(state)) != 0) { + // If error occurs during put expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would tell + // query module to query raw TS data. + taosHashCleanup(pItem->expiredWindows); + taosHashRemove(pItemsHash, indexName, sizeof(indexName)); + return TSDB_CODE_FAILED; + } + } + + return TSDB_CODE_SUCCESS; +} + /** * @brief Judge the tSma storage level *