diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 973f21c92b50570a69a4cab2b096877006e3d4e4..b9f7e6c315c9958e37d3f25ebf64d76e8049fc88 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2172,6 +2172,15 @@ static bool needRetryInsert(SSqlObj* pParentObj, int32_t numOfSub) { return true; } +static void doFreeInsertSupporter(SSqlObj* pSqlObj) { + assert(pSqlObj != NULL && pSqlObj->subState.numOfSub > 0); + + for(int32_t i = 0; i < pSqlObj->subState.numOfSub; ++i) { + SSqlObj* pSql = pSqlObj->pSubs[i]; + tfree(pSql->param); + } +} + static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) { SInsertSupporter *pSupporter = (SInsertSupporter *)param; SSqlObj* pParentObj = pSupporter->pSql; @@ -2203,10 +2212,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) if (pParentObj->res.code == TSDB_CODE_SUCCESS) { tscDebug("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows); - for(int32_t i = 0; i < numOfSub; ++i) { - SSqlObj* pSql = pParentObj->pSubs[i]; - tfree(pSql->param); - } + doFreeInsertSupporter(pParentObj); // todo remove this parameter in async callback function definition. // all data has been sent to vnode, call user function @@ -2214,6 +2220,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) (*pParentObj->fp)(pParentObj->param, pParentObj, v); } else { if (!needRetryInsert(pParentObj, numOfSub)) { + doFreeInsertSupporter(pParentObj); tscQueueAsyncRes(pParentObj); return; } @@ -2244,7 +2251,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) pParentObj->cmd.parseFinished = false; pParentObj->subState.numOfRemain = numOfFailed; - pParentObj->subState.numOfSub = numOfFailed; tscResetSqlCmdObj(&pParentObj->cmd, false); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 359d0155e3267df93e97d9fc64bc088d05317c80..42bd91c388b0f9e6b0599f7c03211fdb7d6bfd28 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -703,10 +703,11 @@ static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_se return forwardStep; } -static UNUSED_FUNC void updateResultRowCurrentIndex(SResultRowInfo* pWindowResInfo, int32_t* numOfClosed, TSKEY lastKey, bool ascQuery) { +static int32_t updateResultRowCurrentIndex(SResultRowInfo* pWindowResInfo, TSKEY lastKey, bool ascQuery) { int32_t i = 0; int64_t skey = TSKEY_INITIAL_VAL; + int32_t numOfClosed = 0; for (i = 0; i < pWindowResInfo->size; ++i) { SResultRow *pResult = pWindowResInfo->pResult[i]; if (pResult->closed) { @@ -732,11 +733,11 @@ static UNUSED_FUNC void updateResultRowCurrentIndex(SResultRowInfo* pWindowResIn } pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey; + return numOfClosed; } /** * NOTE: the query status only set for the first scan of master scan. - * TODO refactor */ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, SResultRowInfo *pWindowResInfo) { SQuery *pQuery = pRuntimeEnv->pQuery; @@ -755,34 +756,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe pWindowResInfo->curIndex = pWindowResInfo->size - 1; setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL); } else { // set the current index to be the last unclosed window - int32_t i = 0; - int64_t skey = TSKEY_INITIAL_VAL; - - for (i = 0; i < pWindowResInfo->size; ++i) { - SResultRow *pResult = pWindowResInfo->pResult[i]; - if (pResult->closed) { - numOfClosed += 1; - continue; - } - - TSKEY ekey = pResult->win.ekey; - if ((ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) { - closeTimeWindow(pWindowResInfo, i); - } else { - skey = pResult->win.skey; - break; - } - } - - // all windows are closed, set the last one to be the skey - if (skey == TSKEY_INITIAL_VAL) { - assert(i == pWindowResInfo->size); - pWindowResInfo->curIndex = pWindowResInfo->size - 1; - } else { - pWindowResInfo->curIndex = i; - } - - pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex]->win.skey; + numOfClosed = updateResultRowCurrentIndex(pWindowResInfo, lastKey, ascQuery); // the number of completed slots are larger than the threshold, return current generated results to client. if (numOfClosed > pQuery->rec.threshold) { @@ -4501,6 +4475,18 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc } else { blockwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, searchFn, pDataBlock); } + + if (QUERY_IS_INTERVAL_QUERY(pQuery)) { + bool ascQuery = QUERY_IS_ASC_QUERY(pQuery); + + // TODO refactor + if ((pTableQueryInfo->lastKey >= pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey <= pTableQueryInfo->win.ekey && (!ascQuery))) { + closeAllTimeWindow(pWindowResInfo); + pWindowResInfo->curIndex = pWindowResInfo->size - 1; + } else { + updateResultRowCurrentIndex(pWindowResInfo, pTableQueryInfo->lastKey, ascQuery); + } + } } bool queryHasRemainResForTableQuery(SQueryRuntimeEnv* pRuntimeEnv) {