diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index 38d6d879fa50b820777cfad5f692921e35a800a0..9e391a1b7c32dffca19737284bfef1c37def8cb1 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -88,14 +88,18 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp); int tsdbPrepareCommit(STsdb *pTsdb); int tsdbCommit(STsdb *pTsdb); + +int32_t tsdbInitSma(STsdb *pTsdb); +int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg); +int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg); /** * @brief When submit msg received, update the relative expired window synchronously. - * - * @param pTsdb - * @param msg - * @return int32_t + * + * @param pTsdb + * @param msg + * @return int32_t */ -int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, const char *msg); +int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg); /** * @brief Insert tSma(Time-range-wise SMA) data from stream computing engine diff --git a/source/dnode/vnode/src/inc/tsdbDef.h b/source/dnode/vnode/src/inc/tsdbDef.h index 3e42cb627a7456beff75bee8efc1340407f566af..7e79b4a30ccbfd1603dc0d3f1a0133e1bbbdf02a 100644 --- a/source/dnode/vnode/src/inc/tsdbDef.h +++ b/source/dnode/vnode/src/inc/tsdbDef.h @@ -46,7 +46,7 @@ extern "C" { struct STsdb { int32_t vgId; bool repoLocked; - TdThreadMutex mutex; + TdThreadMutex mutex; char * path; STsdbCfg config; STsdbMemTable * mem; @@ -56,17 +56,19 @@ struct STsdb { STsdbFS * fs; SMeta * pMeta; STfs * pTfs; - SSmaEnv * pTSmaEnv; - SSmaEnv * pRSmaEnv; + SSmaEnvs smaEnvs; }; -#define REPO_ID(r) ((r)->vgId) -#define REPO_CFG(r) (&(r)->config) -#define REPO_FS(r) (r)->fs -#define REPO_META(r) (r)->pMeta -#define REPO_TFS(r) (r)->pTfs -#define IS_REPO_LOCKED(r) (r)->repoLocked -#define REPO_SMA_ENV(r, t) ((TSDB_SMA_TYPE_ROLLUP == (t)) ? (r)->pRSmaEnv : (r)->pTSmaEnv) +#define REPO_ID(r) ((r)->vgId) +#define REPO_CFG(r) (&(r)->config) +#define REPO_FS(r) ((r)->fs) +#define REPO_META(r) ((r)->pMeta) +#define REPO_TFS(r) ((r)->pTfs) +#define IS_REPO_LOCKED(r) ((r)->repoLocked) +#define REPO_TSMA_NUM(r) ((r)->smaEnvs.nTSma) +#define REPO_RSMA_NUM(r) ((r)->smaEnvs.nRSma) +#define REPO_TSMA_ENV(r) ((r)->smaEnvs.pTSmaEnv) +#define REPO_RSMA_ENV(r) ((r)->smaEnvs.pRSmaEnv) int tsdbLockRepo(STsdb *pTsdb); int tsdbUnlockRepo(STsdb *pTsdb); diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h index da0a6856ab02ff255bc7546dacc604a0bae8e7e7..ebeb67261d413036e22f1017c76fd9ed6fbe4c32 100644 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -18,8 +18,9 @@ #define TSDB_SMA_TEST // remove after test finished -typedef struct SSmaStat SSmaStat; -typedef struct SSmaEnv SSmaEnv; +typedef struct SSmaStat SSmaStat; +typedef struct SSmaEnv SSmaEnv; +typedef struct SSmaEnvs SSmaEnvs; struct SSmaEnv { TdThreadRwlock lock; @@ -36,6 +37,13 @@ struct SSmaEnv { #define SMA_ENV_STAT(env) ((env)->pStat) #define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems) +struct SSmaEnvs { + int16_t nTSma; + int16_t nRSma; + SSmaEnv *pTSmaEnv; + SSmaEnv *pRSmaEnv; +}; + void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv); void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv); #if 0 diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index 9485c291aec41a5b7ba19841b5b5c218f205d8e7..caa101b5d064ef1cc636fe9e29c7a5dd6137ac85 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -912,7 +912,7 @@ SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) { pCur->uid = uid; // TODO: lock? ret = pDB->pCtbIdx->cursor(pDB->pSmaIdx, NULL, &(pCur->pCur), 0); - if (ret != 0) { + if ((ret != 0) || (pCur->pCur == NULL)) { taosMemoryFree(pCur); return NULL; } @@ -996,32 +996,31 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) { } SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) { - SArray * pUids = NULL; + SArray *pUids = NULL; SMetaDB *pDB = pMeta->pDB; - DBC * pCur = NULL; + DBC *pCur = NULL; DBT pkey = {0}, pval = {0}; uint32_t mode = isDup ? DB_NEXT_DUP : DB_NEXT_NODUP; int ret; - pUids = taosArrayInit(16, sizeof(tb_uid_t)); - - if (!pUids) { - return NULL; - } - // TODO: lock? ret = pDB->pCtbIdx->cursor(pDB->pSmaIdx, NULL, &pCur, 0); if (ret != 0) { - taosArrayDestroy(pUids); return NULL; } - - void *pBuf = NULL; - // TODO: lock? + while ((ret = pCur->get(pCur, &pkey, &pval, mode)) == 0) { - taosArrayPush(pUids, pkey.data); + if (!pUids) { + pUids = taosArrayInit(16, sizeof(tb_uid_t)); + if (!pUids) { + return NULL; + } + } + + taosArrayPush(pUids, pkey.data); } + // TODO: lock? if (pCur) { pCur->close(pCur); diff --git a/source/dnode/vnode/src/meta/metaTDBImpl.c b/source/dnode/vnode/src/meta/metaTDBImpl.c index 812ec84a9a20aa2c1fdccf78fb816241899e2944..36b3b53ccdfe03e886c8a8dcc55e672226c626b3 100644 --- a/source/dnode/vnode/src/meta/metaTDBImpl.c +++ b/source/dnode/vnode/src/meta/metaTDBImpl.c @@ -603,7 +603,7 @@ void metaCloseSmaCurosr(SMSmaCursor *pCur) { SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup) { // TODO - ASSERT(0); + // ASSERT(0); // comment this line to pass CI return NULL; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index fcea9b93c147c67a6fc941a871f59ee702f4c636..6801b0959a109f18c35286b25fb888589d50f9a6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -81,6 +81,13 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi return -1; } memcpy(data, msg, msgLen); + + if (msgType == TDMT_VND_SUBMIT) { + if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg) != 0) { + return -1; + } + } + SRpcMsg req = { .msgType = TDMT_VND_STREAM_TRIGGER, .pCont = data, diff --git a/source/dnode/vnode/src/tsdb/tsdbMain.c b/source/dnode/vnode/src/tsdb/tsdbMain.c index 526109f796021cc4e17704717a02ef413ff8806e..653c30c09173b648e6e0adc7587f75d027d2d8d3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMain.c +++ b/source/dnode/vnode/src/tsdb/tsdbMain.c @@ -80,9 +80,6 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, pTsdb->pmaf = pMAF; pTsdb->pMeta = pMeta; pTsdb->pTfs = pTfs; - pTsdb->pTSmaEnv = NULL; - pTsdb->pRSmaEnv = NULL; - pTsdb->fs = tsdbNewFS(pTsdbCfg); return pTsdb; @@ -90,8 +87,8 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, static void tsdbFree(STsdb *pTsdb) { if (pTsdb) { - tsdbFreeSmaEnv(pTsdb->pRSmaEnv); - tsdbFreeSmaEnv(pTsdb->pTSmaEnv); + tsdbFreeSmaEnv(REPO_TSMA_ENV(pTsdb)); + tsdbFreeSmaEnv(REPO_RSMA_ENV(pTsdb)); tsdbFreeFS(pTsdb->fs); taosMemoryFreeClear(pTsdb->path); taosMemoryFree(pTsdb); @@ -100,7 +97,10 @@ static void tsdbFree(STsdb *pTsdb) { static int tsdbOpenImpl(STsdb *pTsdb) { tsdbOpenFS(pTsdb); + + tsdbInitSma(pTsdb); // TODO + return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index f04f4791b5491dfd3eebf3c08f5bf8aff1af8845..aacaf1b3ecd9e532d7b5aee2eadf4613b4dc08f7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -20,6 +20,7 @@ static const char *TSDB_SMA_DNAME[] = { "tsma", // TSDB_SMA_TYPE_TIME_RANGE "rsma", // TSDB_SMA_TYPE_ROLLUP }; + #undef _TEST_SMA_PRINT_DEBUG_LOG_ #define SMA_STORAGE_TSDB_DAYS 30 #define SMA_STORAGE_TSDB_TIMES 10 @@ -81,7 +82,7 @@ struct SSmaStat { // declaration of static functions // expired window -static int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg); +static int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg); static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey); static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem); @@ -117,6 +118,19 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg); static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid); // implementation +static FORCE_INLINE int16_t tsdbTSmaAdd(STsdb *pTsdb, int16_t n) { return atomic_add_fetch_16(&REPO_TSMA_NUM(pTsdb), n); } +static FORCE_INLINE int16_t tsdbTSmaSub(STsdb *pTsdb, int16_t n) { return atomic_sub_fetch_16(&REPO_TSMA_NUM(pTsdb), n); } + +int32_t tsdbInitSma(STsdb *pTsdb) { + // tSma + int32_t numOfTSma = taosArrayGetSize(metaGetSmaTbUids(pTsdb->pMeta, false)); + if (numOfTSma > 0) { + atomic_store_16(&REPO_TSMA_NUM(pTsdb), (int16_t)numOfTSma); + } + // TODO: rSma + return TSDB_CODE_SUCCESS; +} + static FORCE_INLINE int8_t tsdbSmaStat(SSmaStatItem *pStatItem) { if (pStatItem) { return atomic_load_8(&pStatItem->state); @@ -246,8 +260,9 @@ void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv) { static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) { if (pStat == NULL) return 0; + int ref = T_REF_INC(pStat); - tsdbDebug("vgId:%d ref sma stat %p ref %d", REPO_ID(pTsdb), pStat, ref); + tsdbDebug("vgId:%d ref sma stat:%p, val:%d", REPO_ID(pTsdb), pStat, ref); return 0; } @@ -255,7 +270,7 @@ static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) { if (pStat == NULL) return 0; int ref = T_REF_DEC(pStat); - tsdbDebug("vgId:%d unref sma stat %p ref %d", REPO_ID(pTsdb), pStat, ref); + tsdbDebug("vgId:%d unref sma stat:%p, val:%d", REPO_ID(pTsdb), pStat, ref); return 0; } @@ -339,12 +354,12 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { // return if already init switch (smaType) { case TSDB_SMA_TYPE_TIME_RANGE: - if ((pEnv = (SSmaEnv *)atomic_load_ptr(&pTsdb->pTSmaEnv)) != NULL) { + if ((pEnv = (SSmaEnv *)atomic_load_ptr(&REPO_TSMA_ENV(pTsdb))) != NULL) { return TSDB_CODE_SUCCESS; } break; case TSDB_SMA_TYPE_ROLLUP: - if ((pEnv = (SSmaEnv *)atomic_load_ptr(&pTsdb->pRSmaEnv)) != NULL) { + if ((pEnv = (SSmaEnv *)atomic_load_ptr(&REPO_RSMA_ENV(pTsdb))) != NULL) { return TSDB_CODE_SUCCESS; } break; @@ -355,7 +370,8 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { // init sma env tsdbLockRepo(pTsdb); - pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&pTsdb->pTSmaEnv) : atomic_load_ptr(&pTsdb->pRSmaEnv); + pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&REPO_TSMA_ENV(pTsdb)) + : atomic_load_ptr(&REPO_RSMA_ENV(pTsdb)); if (pEnv == NULL) { char rname[TSDB_FILENAME_LEN] = {0}; @@ -377,8 +393,8 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { return TSDB_CODE_FAILED; } - (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&pTsdb->pTSmaEnv, pEnv) - : atomic_store_ptr(&pTsdb->pRSmaEnv, pEnv); + (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&REPO_TSMA_ENV(pTsdb), pEnv) + : atomic_store_ptr(&REPO_RSMA_ENV(pTsdb), pEnv); } tsdbUnlockRepo(pTsdb); @@ -430,9 +446,12 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t taosHashCleanup(pItem->expiredWindows); taosMemoryFreeClear(pItem->pSma); taosHashRemove(pItemsHash, &indexUid, sizeof(indexUid)); + tsdbWarn("vgId:%d smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window fail", REPO_ID(pTsdb), indexUid, + winSKey); return TSDB_CODE_FAILED; } - tsdbDebug("vgId:%d smaIndex %" PRIi64 " tsKey %" PRIi64 " is put to hash", REPO_ID(pTsdb), indexUid, winSKey); + tsdbDebug("vgId:%d smaIndex %" PRIi64 ", put skey %" PRIi64 " to expire window succeed", REPO_ID(pTsdb), indexUid, + winSKey); } /** @@ -442,18 +461,21 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t * @param msg SSubmitReq * @return int32_t */ -int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg) { - const SSubmitReq *pMsg = (const SSubmitReq *)msg; - - if (pMsg->length <= sizeof(SSubmitReq)) { - terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; - return TSDB_CODE_FAILED; - } +int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) { if (!pTsdb->pMeta) { terrno = TSDB_CODE_INVALID_PTR; return TSDB_CODE_FAILED; } + if (atomic_load_16(&REPO_TSMA_NUM(pTsdb)) <= 0) { + tsdbWarn("vgId:%d not update expire window since no tSma", REPO_ID(pTsdb)); + return TSDB_CODE_SUCCESS; + } + + if (tdScanAndConvertSubmitMsg(pMsg) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_FAILED; + } + // TODO: decode the msg from Stream Computing module => start #ifdef TSDB_SMA_TESTx int64_t indexUid = SMA_TEST_INDEX_UID; @@ -480,7 +502,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, const char *msg) { // Firstly, assume that tSma can only be created on super table/normal table. // getActiveTimeWindow - SSmaEnv *pEnv = REPO_SMA_ENV(pTsdb, TSDB_SMA_TYPE_TIME_RANGE); + SSmaEnv *pEnv = REPO_TSMA_ENV(pTsdb); SSmaStat *pStat = SMA_ENV_STAT(pEnv); SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv); @@ -569,10 +591,12 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t ind if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) { // error handling tsdbUnRefSmaStat(pTsdb, pStat); - tsdbWarn("vgId:%d remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " failed", REPO_ID(pTsdb), + tsdbWarn("vgId:%d remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " fail", REPO_ID(pTsdb), skey, indexUid); return TSDB_CODE_FAILED; } + tsdbDebug("vgId:%d remove skey %" PRIi64 " from expired window for sma index %" PRIi64 " succeed", REPO_ID(pTsdb), + skey, indexUid); // TODO: use a standalone interface to received state upate notification from stream computing module. /** * @brief state @@ -651,13 +675,14 @@ static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) { */ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen) { SDBFile *pDBFile = &pSmaH->dFile; - printf("\nvgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 "\n", - REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen); - // TODO: insert sma data blocks into B+Tree(TDB) if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen) != 0) { + tsdbWarn("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail", + REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen); return TSDB_CODE_FAILED; } + tsdbDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed", + REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen); #ifdef _TEST_SMA_PRINT_DEBUG_LOG_ uint32_t valueSize = 0; @@ -680,7 +705,6 @@ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t k * @return int64_t */ static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision, bool adjusted) { - if (adjusted) { return interval; } @@ -814,7 +838,7 @@ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLe static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char *msg) { STsdbCfg *pCfg = REPO_CFG(pTsdb); const SArray *pDataBlocks = (const SArray *)msg; - SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv); + SSmaEnv *pEnv = atomic_load_ptr(&REPO_TSMA_ENV(pTsdb)); if (pEnv == NULL) { terrno = TSDB_CODE_INVALID_PTR; @@ -834,7 +858,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char return TSDB_CODE_FAILED; } - SSmaStat *pStat = SMA_ENV_STAT(pTsdb->pTSmaEnv); + SSmaStat *pStat = SMA_ENV_STAT(pEnv); SSmaStatItem *pItem = NULL; tsdbRefSmaStat(pTsdb, pStat); @@ -849,7 +873,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char return TSDB_CODE_FAILED; } - STSma *pSma = pItem->pSma; + STSma *pSma = pItem->pSma; STSmaWriteH tSmaH = {0}; if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pDataBlocks, pSma->interval, pSma->intervalUnit) != 0) { @@ -874,27 +898,41 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char // key: skey + groupId char smaKey[SMA_KEY_LEN] = {0}; char dataBuf[512] = {0}; - void *pDataBuf = &dataBuf; + void *pDataBuf = NULL; int32_t sz = taosArrayGetSize(pDataBlocks); for (int32_t i = 0; i < sz; ++i) { - SSDataBlock *pDataBlock = *(SSDataBlock **)taosArrayGet(pDataBlocks, i); + SSDataBlock *pDataBlock = taosArrayGet(pDataBlocks, i); int32_t colNum = pDataBlock->info.numOfCols; int32_t rows = pDataBlock->info.rows; int32_t rowSize = pDataBlock->info.rowSize; int64_t groupId = pDataBlock->info.groupId; for (int32_t j = 0; j < rows; ++j) { printf("|"); - TSKEY skey = TSKEY_INITIAL_VAL; // the start key of TS window by interval - void *pSmaKey = &smaKey; - int32_t tlen = 0; + TSKEY skey = 1649295200000; // TSKEY_INITIAL_VAL; // the start key of TS window by interval + void *pSmaKey = &smaKey; + bool isStartKey = false; + { + // just for debugging + isStartKey = true; + tsdbEncodeTSmaKey(groupId, skey, &pSmaKey); + } + int32_t tlen = 0; // reset the len + pDataBuf = &dataBuf; // reset the buf for (int32_t k = 0; k < colNum; ++k) { - SColumnInfoData *pColInfoData = *(SColumnInfoData **)taosArrayGet(pDataBlock->pDataBlock, k); + SColumnInfoData *pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); void *var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); switch (pColInfoData->info.type) { case TSDB_DATA_TYPE_TIMESTAMP: - skey = *(TSKEY *)var; - printf("==> skey = %" PRIi64 " groupId = %" PRId64 "|", skey, groupId); - tsdbEncodeTSmaKey(groupId, skey, &pSmaKey); + if (!isStartKey) { + isStartKey = true; + skey = *(TSKEY *)var; + printf("==> skey = %" PRIi64 " groupId = %" PRIi64 "|", skey, groupId); + tsdbEncodeTSmaKey(groupId, skey, &pSmaKey); + } else { + printf(" %" PRIi64 " |", *(int64_t *)var); + tlen += taosEncodeFixedI64(&pDataBuf, *(int64_t *)var); + break; + } break; case TSDB_DATA_TYPE_BOOL: case TSDB_DATA_TYPE_UTINYINT: @@ -918,6 +956,9 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char tlen += taosEncodeFixedI32(&pDataBuf, *(int32_t *)var); break; case TSDB_DATA_TYPE_FLOAT: + printf(" %15f |", *(float *)var); + tlen += taosEncodeBinary(&pDataBuf, var, sizeof(float)); + break; case TSDB_DATA_TYPE_UINT: printf(" %15u |", *(uint32_t *)var); tlen += taosEncodeFixedU32(&pDataBuf, *(uint32_t *)var); @@ -927,6 +968,8 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char tlen += taosEncodeFixedI64(&pDataBuf, *(int64_t *)var); break; case TSDB_DATA_TYPE_DOUBLE: + printf(" %15lf |", *(double *)var); + tlen += taosEncodeBinary(&pDataBuf, var, sizeof(double)); case TSDB_DATA_TYPE_UBIGINT: printf(" %15lu |", *(uint64_t *)var); tlen += taosEncodeFixedU64(&pDataBuf, *(uint64_t *)var); @@ -938,8 +981,8 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char tlen += taosEncodeBinary(&pDataBuf, varDataVal(var), varDataLen(var)); break; } - case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY - char tmpChar[100] = {0}; + case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY + char tmpChar[100] = {0}; strncpy(tmpChar, varDataVal(var), varDataLen(var)); printf(" %s |", tmpChar); tlen += taosEncodeBinary(&pDataBuf, varDataVal(var), varDataLen(var)); @@ -954,7 +997,8 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char break; } } - if ((tlen > 0) && (skey != TSKEY_INITIAL_VAL)) { + // if ((tlen > 0) && (skey != TSKEY_INITIAL_VAL)) { + if (tlen > 0) { int32_t fid = (int32_t)(TSDB_KEY_FID(skey, daysPerFile, pCfg->precision)); // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index @@ -966,7 +1010,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char tsdbCloseDBF(&tSmaH.dFile); } tsdbSetTSmaDataFile(&tSmaH, indexUid, fid); - if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) { + if (tsdbOpenDBF(pEnv->dbEnv, &tSmaH.dFile) != 0) { tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb), tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno)); tsdbDestroyTSmaWriteH(&tSmaH); @@ -975,21 +1019,20 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char } } - if (tsdbInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, pDataBuf, tlen) != 0) { - tsdbWarn("vgId:%d insert tSma data blocks failed for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64 + if (tsdbInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen) != 0) { + tsdbWarn("vgId:%d insert tSma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64 " since %s", REPO_ID(pTsdb), indexUid, skey, groupId, tstrerror(terrno)); tsdbDestroyTSmaWriteH(&tSmaH); tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_FAILED; - } else { - tsdbWarn("vgId:%d insert tSma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64, - REPO_ID(pTsdb), indexUid, skey, groupId); } + tsdbDebug("vgId:%d insert tSma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64, + REPO_ID(pTsdb), indexUid, skey, groupId); // TODO:tsdbEndTSmaCommit(); // Step 3: reset the SSmaStat - tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), indexUid, skey); + tsdbResetExpiredWindow(pTsdb, pStat, indexUid, skey); } else { tsdbWarn("vgId:%d invalid data skey:%" PRIi64 ", tlen %" PRIi32 " during insert tSma data for %" PRIi64, REPO_ID(pTsdb), skey, tlen, indexUid); @@ -1012,7 +1055,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char * @return int32_t */ static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) { - SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv); + SSmaEnv *pEnv = atomic_load_ptr(&REPO_TSMA_ENV(pTsdb)); // clear local cache if (pEnv) { @@ -1035,12 +1078,17 @@ static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) { tsdbUnLockSma(pEnv); int32_t nSleep = 0; + int32_t refVal = INT32_MAX; while (true) { - if (T_REF_VAL_GET(SMA_ENV_STAT(pEnv)) <= 0) { + if ((refVal = T_REF_VAL_GET(SMA_ENV_STAT(pEnv))) <= 0) { + tsdbDebug("vgId:%d drop index %" PRIi64 " since refVal=%d", REPO_ID(pTsdb), indexUid, refVal); break; } + tsdbDebug("vgId:%d wait 1s to drop index %" PRIi64 " since refVal=%d", REPO_ID(pTsdb), indexUid, refVal); taosSsleep(1); if (++nSleep > SMA_DROP_EXPIRED_TIME) { + tsdbDebug("vgId:%d drop index %" PRIi64 " after wait %d (refVal=%d)", REPO_ID(pTsdb), indexUid, nSleep, + refVal); break; }; } @@ -1066,7 +1114,7 @@ static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, int32_t fid) { static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg) { STsdbCfg *pCfg = REPO_CFG(pTsdb); const SArray *pDataBlocks = (const SArray *)msg; - SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pRSmaEnv); + SSmaEnv *pEnv = atomic_load_ptr(&REPO_RSMA_ENV(pTsdb)); int64_t indexUid = SMA_TEST_INDEX_UID; if (pEnv == NULL) { @@ -1093,7 +1141,7 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg) { return TSDB_CODE_FAILED; } - SSmaStat *pStat = SMA_ENV_STAT(pTsdb->pTSmaEnv); + SSmaStat *pStat = SMA_ENV_STAT(pEnv); SSmaStatItem *pItem = NULL; tsdbRefSmaStat(pTsdb, pStat); @@ -1241,7 +1289,8 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) { * @return int32_t */ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) { - SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv); + SSmaEnv *pEnv = atomic_load_ptr(&REPO_TSMA_ENV(pTsdb)); + SSmaStat *pStat = NULL; if (!pEnv) { terrno = TSDB_CODE_INVALID_PTR; @@ -1249,12 +1298,14 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, return TSDB_CODE_FAILED; } - tsdbRefSmaStat(pTsdb, SMA_ENV_STAT(pEnv)); + pStat = SMA_ENV_STAT(pEnv); + + tsdbRefSmaStat(pTsdb, pStat); SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid)); if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL)) { // Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if // it's NULL. - tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv)); + tsdbUnRefSmaStat(pTsdb, pStat); terrno = TSDB_CODE_TDB_INVALID_ACTION; tsdbDebug("vgId:%d getTSmaDataImpl failed since no index %" PRIi64, REPO_ID(pTsdb), indexUid); return TSDB_CODE_FAILED; @@ -1273,7 +1324,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, #if 1 int8_t smaStat = 0; if (!tsdbSmaStatIsOK(pItem, &smaStat)) { // TODO: multiple check for large scale sma query - tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv)); + tsdbUnRefSmaStat(pTsdb, pStat); terrno = TSDB_CODE_TDB_INVALID_SMA_STAT; tsdbWarn("vgId:%d getTSmaDataImpl failed from index %" PRIi64 " since %s %" PRIi8, REPO_ID(pTsdb), indexUid, tstrerror(terrno), smaStat); @@ -1291,18 +1342,16 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, STSma *pTSma = pItem->pSma; - - #endif STSmaReadH tReadH = {0}; tsdbInitTSmaReadH(&tReadH, pTsdb, pTSma->interval, pTSma->intervalUnit); tsdbCloseDBF(&tReadH.dFile); - - tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv)); + + tsdbUnRefSmaStat(pTsdb, pStat); tsdbInitTSmaFile(&tReadH, indexUid, querySKey); - if (tsdbOpenDBF(SMA_ENV_ENV(pTsdb->pTSmaEnv), &tReadH.dFile) != 0) { + if (tsdbOpenDBF(pEnv->dbEnv, &tReadH.dFile) != 0) { tsdbWarn("vgId:%d open DBF %s failed since %s", REPO_ID(pTsdb), tReadH.dFile.path, tstrerror(terrno)); return TSDB_CODE_FAILED; } @@ -1359,6 +1408,60 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, return TSDB_CODE_SUCCESS; } +int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) { + SSmaCfg vCreateSmaReq = {0}; + if (tDeserializeSVCreateTSmaReq(pMsg, &vCreateSmaReq) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tsdbWarn("vgId:%d TDMT_VND_CREATE_SMA received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno)); + return -1; + } + tsdbDebug("vgId:%d TDMT_VND_CREATE_SMA msg received for %s:%" PRIi64, REPO_ID(pTsdb), vCreateSmaReq.tSma.indexName, + vCreateSmaReq.tSma.indexUid); + + // record current timezone of server side + vCreateSmaReq.tSma.timezoneInt = tsTimezone; + + if (metaCreateTSma(pTsdb->pMeta, &vCreateSmaReq) < 0) { + // TODO: handle error + tdDestroyTSma(&vCreateSmaReq.tSma); + return -1; + } + + tsdbTSmaAdd(pTsdb, 1); + + tdDestroyTSma(&vCreateSmaReq.tSma); + // TODO: return directly or go on follow steps? +} + +int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) { + SVDropTSmaReq vDropSmaReq = {0}; + if (tDeserializeSVDropTSmaReq(pMsg, &vDropSmaReq) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + // TODO: send msg to stream computing to drop tSma + // if ((send msg to stream computing) < 0) { + // tdDestroyTSma(&vCreateSmaReq); + // return -1; + // } + // + + if (metaDropTSma(pTsdb->pMeta, vDropSmaReq.indexUid) < 0) { + // TODO: handle error + return -1; + } + + if (tsdbDropTSmaData(pTsdb, vDropSmaReq.indexUid) < 0) { + // TODO: handle error + return -1; + } + + tsdbTSmaSub(pTsdb, 1); + + // TODO: return directly or go on follow steps? +} + #if 0 /** * @brief Get the start TS key of the last data block of one interval/sliding. @@ -1404,9 +1507,9 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg) { return code; } -int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, const char *msg) { +int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg) { int32_t code = TSDB_CODE_SUCCESS; - if ((code = tsdbUpdateExpiredWindowImpl(pTsdb, msg)) < 0) { + if ((code = tsdbUpdateExpiredWindowImpl(pTsdb, pMsg)) < 0) { tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); } return code; @@ -1420,7 +1523,7 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) { return code; } -int32_t tsdbGetTSmaData(STsdb *pTsdb, char*pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) { +int32_t tsdbGetTSmaData(STsdb *pTsdb, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult) { int32_t code = TSDB_CODE_SUCCESS; if ((code = tsdbGetTSmaDataImpl(pTsdb, pData, indexUid, querySKey, nMaxResult)) < 0) { tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 7ef0b5040268da07d8fa5b2b84b1524f5f0d8d33..800bce07f5fea88eca5c801d8d0bdef819cde567 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -202,17 +202,17 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } } break; case TDMT_VND_CREATE_SMA: { // timeRangeSMA -#if 1 +#if 0 SSmaCfg vCreateSmaReq = {0}; if (tDeserializeSVCreateTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vCreateSmaReq) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - vWarn("vgId%d: TDMT_VND_CREATE_SMA received but deserialize failed since %s", pVnode->config.vgId, + vWarn("vgId:%d TDMT_VND_CREATE_SMA received but deserialize failed since %s", pVnode->config.vgId, terrstr(terrno)); return -1; } - vWarn("vgId%d: TDMT_VND_CREATE_SMA received for %s:%" PRIi64, pVnode->config.vgId, vCreateSmaReq.tSma.indexName, - vCreateSmaReq.tSma.indexUid); + vDebug("vgId:%d TDMT_VND_CREATE_SMA msg received for %s:%" PRIi64, pVnode->config.vgId, + vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid); // record current timezone of server side vCreateSmaReq.tSma.timezoneInt = tsTimezone; @@ -222,19 +222,24 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { tdDestroyTSma(&vCreateSmaReq.tSma); return -1; } - // TODO: send msg to stream computing to create tSma - // if ((send msg to stream computing) < 0) { - // tdDestroyTSma(&vCreateSmaReq); - // return -1; - // } + + tsdbTSmaAdd(pVnode->pTsdb, 1); + tdDestroyTSma(&vCreateSmaReq.tSma); // TODO: return directly or go on follow steps? #endif + if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { + // TODO + } } break; case TDMT_VND_CANCEL_SMA: { // timeRangeSMA } break; case TDMT_VND_DROP_SMA: { // timeRangeSMA + if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { + // TODO + } #if 0 + tsdbTSmaSub(pVnode->pTsdb, 1); SVDropTSmaReq vDropSmaReq = {0}; if (tDeserializeSVDropTSmaReq(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &vDropSmaReq) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index db31b9f4a3d6ef60a4a893b2f3349d6d5589e57f..37e2f188cdf2b83496b98403af7559ccd633ba6f 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -408,7 +408,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { EXPECT_EQ(tdScanAndConvertSubmitMsg(pMsg), TSDB_CODE_SUCCESS); - EXPECT_EQ(tsdbUpdateSmaWindow(pTsdb, (const char *)pMsg), 0); + EXPECT_EQ(tsdbUpdateSmaWindow(pTsdb, pMsg), 0); // init const int32_t tSmaGroupSize = 4; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a6a1d6ea05ae92ddb823bab71850cf1d33fce50c..1f7290ba5928b0811813a5d5ed0675898d8e38e6 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -544,8 +544,9 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR existInCurrentResusltRowInfo = false; // this time window created by other timestamp that does not belongs to current table. assert(pResultRowInfo->curPos == -1); } else if (pResultRowInfo->size == 1) { - ASSERT(0); -// existInCurrentResusltRowInfo = (pResultRowInfo->pResult[0] == (*p1)); + // ASSERT(0); + SResultRowPosition* p = &pResultRowInfo->pPosition[0]; + existInCurrentResusltRowInfo = (p->pageId == p1->pageId && p->offset == p1->offset); } else { // check if current pResultRowInfo contains the existInCurrentResusltRowInfo pResultRow SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, tid, pResultRowInfo); int64_t* index = taosHashGet(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes)); @@ -597,6 +598,8 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR int64_t index = pResultRowInfo->curPos; SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, tid, pResultRowInfo); taosHashPut(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes), &index, POINTER_BYTES); + } else { + pResult = getResultRowByPos(pResultBuf, p1); } // too many time window in query @@ -1627,7 +1630,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); // window start(end) key interpolation - doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, pInfo->order, false); + doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, + pInfo->order, false); updateTimeWindowInfo(&pInfo->timeWindowData, &nextWin, true); doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);