From c59478ec71cd84f9ec5624c3147100f89e6e4ee5 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 11 Jul 2022 14:20:57 +0800 Subject: [PATCH] enh: tsma/rsma code optimization --- source/dnode/mnode/impl/src/mndSma.c | 14 ++++++++++++-- source/libs/executor/src/scanoperator.c | 12 ++++++------ source/libs/parser/src/parTranslater.c | 11 +++++++++-- tests/script/jenkins/basic.txt | 4 ++-- 4 files changed, 29 insertions(+), 12 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 388441b1dc..c1513cd92f 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -527,10 +527,20 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea streamObj.version = 1; streamObj.sql = pCreate->sql; streamObj.smaId = smaObj.uid; - streamObj.watermark = 0; - streamObj.trigger = STREAM_TRIGGER_AT_ONCE; + streamObj.watermark = pCreate->watermark; + streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE; + streamObj.triggerParam = pCreate->maxDelay; streamObj.ast = strdup(smaObj.ast); + // check the maxDelay + if (streamObj.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) { + int64_t msInterval = convertTimeFromPrecisionToUnit(pCreate->interval, pDb->cfg.precision, TIME_UNIT_MILLISECOND); + streamObj.triggerParam = msInterval > TSDB_MIN_ROLLUP_MAX_DELAY ? msInterval : TSDB_MIN_ROLLUP_MAX_DELAY; + } + if (streamObj.triggerParam > TSDB_MAX_ROLLUP_MAX_DELAY) { + streamObj.triggerParam = TSDB_MAX_ROLLUP_MAX_DELAY; + } + if (mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg) != 0) { mError("sma:%s, failed to create since %s", smaObj.name, terrstr()); return -1; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 407f799496..0194cd78dc 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1134,12 +1134,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock pInfo->pRes->info.type = STREAM_NORMAL; pInfo->pRes->info.capacity = pBlock->info.rows; - // for generating rollup SMA result, each time is an independent time serie. - // TODO temporarily used, when the statement of "partition by tbname" is ready, remove this - if (pInfo->assignBlockUid) { - pInfo->pRes->info.groupId = pBlock->info.uid; - } - uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); if (groupIdPre) { pInfo->pRes->info.groupId = *groupIdPre; @@ -1147,6 +1141,12 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock pInfo->pRes->info.groupId = 0; } + // for generating rollup SMA result, each time is an independent time serie. + // TODO temporarily used, when the statement of "partition by tbname" is ready, remove this + if (pInfo->assignBlockUid) { + pInfo->pRes->info.groupId = pBlock->info.uid; + } + // todo extract method for (int32_t i = 0; i < taosArrayGetSize(pInfo->pColMatchInfo); ++i) { SColMatchInfo* pColMatchInfo = taosArrayGet(pInfo->pColMatchInfo, i); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f92713565b..b685db1f0e 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -4017,8 +4017,15 @@ static int32_t buildCreateSmaReq(STranslateContext* pCxt, SCreateIndexStmt* pStm (NULL != pStmt->pOptions->pSliding ? ((SValueNode*)pStmt->pOptions->pSliding)->unit : pReq->intervalUnit); if (NULL != pStmt->pOptions->pStreamOptions) { SStreamOptions* pStreamOpt = (SStreamOptions*)pStmt->pOptions->pStreamOptions; - pReq->maxDelay = (NULL != pStreamOpt->pDelay ? ((SValueNode*)pStreamOpt->pDelay)->datum.i : 0); - pReq->watermark = (NULL != pStreamOpt->pWatermark ? ((SValueNode*)pStreamOpt->pWatermark)->datum.i : 0); + pReq->maxDelay = (NULL != pStreamOpt->pDelay ? ((SValueNode*)pStreamOpt->pDelay)->datum.i : -1); + pReq->watermark = (NULL != pStreamOpt->pWatermark ? ((SValueNode*)pStreamOpt->pWatermark)->datum.i + : TSDB_DEFAULT_ROLLUP_WATERMARK); + if (pReq->watermark < TSDB_MIN_ROLLUP_WATERMARK) { + pReq->watermark = TSDB_MIN_ROLLUP_WATERMARK; + } + if (pReq->watermark > TSDB_MAX_ROLLUP_WATERMARK) { + pReq->watermark = TSDB_MAX_ROLLUP_WATERMARK; + } } int32_t code = getSmaIndexDstVgId(pCxt, pStmt->tableName, &pReq->dstVgId); diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 94414edbf2..6826258151 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -164,8 +164,8 @@ # --- sma ./test.sh -f tsim/sma/drop_sma.sim ./test.sh -f tsim/sma/tsmaCreateInsertQuery.sim -#./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim -#./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim +./test.sh -f tsim/sma/rsmaCreateInsertQuery.sim +./test.sh -f tsim/sma/rsmaPersistenceRecovery.sim # --- valgrind ./test.sh -f tsim/valgrind/checkError1.sim -- GitLab