From 744a198906ca1156791456e15f27774c2877795c Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 13 Apr 2022 20:02:11 +0800 Subject: [PATCH] add version for tsma expired window updating --- source/dnode/vnode/inc/vnode.h | 5 +-- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tsdb/tsdbSma.c | 47 +++++++++---------------- source/dnode/vnode/test/tsdbSmaTest.cpp | 2 +- 4 files changed, 22 insertions(+), 34 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index aab835b958..a93e1741bb 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -393,10 +393,11 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg); * @brief When submit msg received, update the relative expired window synchronously. * * @param pTsdb - * @param msg + * @param pMsg + * @param version * @return int32_t */ -int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg); +int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version); /** * @brief Insert tSma(Time-range-wise SMA) data from stream computing engine diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bee61f4d40..d7cde44a6f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -82,7 +82,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi memcpy(data, msg, msgLen); if (msgType == TDMT_VND_SUBMIT) { - if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg) != 0) { + if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg, version) != 0) { return -1; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index bfdad836f1..7de5a0d5a9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -105,8 +105,8 @@ struct SSmaStat { // declaration of static functions // expired window -static int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg); -static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey); +static int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version); +static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version); static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem); static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); @@ -544,7 +544,7 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { return TSDB_CODE_SUCCESS; }; -static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey) { +static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version) { SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); if (pItem == NULL) { // TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later @@ -578,8 +578,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t return TSDB_CODE_FAILED; } - int8_t state = TSDB_SMA_STAT_EXPIRED; - if (taosHashPut(pItem->expiredWindows, &winSKey, sizeof(TSKEY), &state, sizeof(state)) != 0) { + if (taosHashPut(pItem->expiredWindows, &winSKey, sizeof(TSKEY), &version, sizeof(version)) != 0) { // If error occurs during taosHashPut expired windows, remove the smaIndex from pTsdb->pSmaStat, thus TSDB would // tell query module to query raw TS data. // N.B. @@ -606,7 +605,8 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t * @param msg SSubmitReq * @return int32_t */ -int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) { +int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version) { + // no time-range-sma, just return success if (atomic_load_16(&REPO_TSMA_NUM(pTsdb)) <= 0) { tsdbTrace("vgId:%d not update expire window since no tSma", REPO_ID(pTsdb)); return TSDB_CODE_SUCCESS; @@ -621,20 +621,6 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) { return TSDB_CODE_FAILED; } -// TODO: decode the msg from Stream Computing module => start -#ifdef TSDB_SMA_TESTx - int64_t indexUid = SMA_TEST_INDEX_UID; - const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10; - TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE]; - TSKEY skey1 = 1646987196 * 1e3; - for (int32_t i = 0; i < SMA_TEST_EXPIRED_WINDOW_SIZE; ++i) { - expiredWindows[i] = skey1 + i; - } -#else - -#endif - // TODO: decode the msg <= end - if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TDB_INIT_FAILED; return TSDB_CODE_FAILED; @@ -700,7 +686,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg) { TSKEY winSKey = taosTimeTruncate(TD_ROW_KEY(row), &interval, interval.precision); - tsdbSetExpiredWindow(pTsdb, pItemsHash, pTSma->indexUid, winSKey); + tsdbSetExpiredWindow(pTsdb, pItemsHash, pTSma->indexUid, winSKey, version); // TODO: release only when suid changes. tdDestroyTSmaWrapper(pSW); @@ -975,7 +961,7 @@ static int tsdbSmaBeginCommit(SSmaEnv *pEnv) { // start a new txn tdbTxnOpen(pTxn, 0, poolMalloc, poolFree, pEnv->pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); if (tdbBegin(pEnv->dbEnv, pTxn) != 0) { - tsdbWarn("tsdbSma tdb restart txn fail"); + tsdbWarn("tsdbSma tdb begin commit fail"); return -1; } return 0; @@ -986,7 +972,7 @@ static int tsdbSmaEndCommit(SSmaEnv *pEnv) { // Commit current txn if (tdbCommit(pEnv->dbEnv, pTxn) != 0) { - tsdbWarn("tsdbSma tdb commit fail"); + tsdbWarn("tsdbSma tdb end commit fail"); return -1; } tdbTxnClose(pTxn); @@ -1009,12 +995,12 @@ static int tsdbSmaEndCommit(SSmaEnv *pEnv) { static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char *msg) { STsdbCfg *pCfg = REPO_CFG(pTsdb); const SArray *pDataBlocks = (const SArray *)msg; - SSmaEnv *pEnv = atomic_load_ptr(&REPO_TSMA_ENV(pTsdb)); - if (pEnv == NULL) { - terrno = TSDB_CODE_INVALID_PTR; - tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb)); - return terrno; + // For super table aggregation, the sma data is stored in vgroup calculated from the hash value of stable name. Thus + // the sma data would arrive ahead of the update-expired-window msg. + if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) { + terrno = TSDB_CODE_TDB_INIT_FAILED; + return TSDB_CODE_FAILED; } if (pDataBlocks == NULL) { @@ -1029,6 +1015,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char return TSDB_CODE_FAILED; } + SSmaEnv *pEnv = REPO_TSMA_ENV(pTsdb); SSmaStat *pStat = SMA_ENV_STAT(pEnv); SSmaStatItem *pItem = NULL; @@ -1683,9 +1670,9 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg) { return code; } -int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg) { +int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version) { int32_t code = TSDB_CODE_SUCCESS; - if ((code = tsdbUpdateExpiredWindowImpl(pTsdb, pMsg)) < 0) { + if ((code = tsdbUpdateExpiredWindowImpl(pTsdb, pMsg, version)) < 0) { tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); } return code; diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index da874716f2..b0217a0462 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -409,7 +409,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { EXPECT_EQ(tdScanAndConvertSubmitMsg(pMsg), TSDB_CODE_SUCCESS); - EXPECT_EQ(tsdbUpdateSmaWindow(pTsdb, pMsg), 0); + EXPECT_EQ(tsdbUpdateSmaWindow(pTsdb, pMsg, 0), 0); // init const int32_t tSmaGroupSize = 4; -- GitLab