From c303ff2ab9e6fc69e4a60df935111cfef71a30db Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Fri, 31 Jan 2020 15:37:32 +0800 Subject: [PATCH] refactor some codes [tbase-266] --- src/client/src/tscFunctionImpl.c | 56 ++-- src/system/detail/inc/vnodeQueryImpl.h | 8 +- src/system/detail/inc/vnodeRead.h | 11 +- src/system/detail/src/vnodeQueryImpl.c | 369 ++++++++++++---------- src/system/detail/src/vnodeQueryProcess.c | 110 ++++--- src/system/detail/src/vnodeRead.c | 4 +- 6 files changed, 290 insertions(+), 268 deletions(-) diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 34b6666047..4059cb60cb 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -72,6 +72,8 @@ for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \ void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {} void noop2(SQLFunctionCtx *UNUSED_PARAM(pCtx), int32_t UNUSED_PARAM(index)) {} +void doFinalizer(SQLFunctionCtx *pCtx) { resetResultInfo(GET_RES_INFO(pCtx)); } + typedef struct tValuePair { tVariant v; int64_t timestamp; @@ -355,8 +357,8 @@ static void function_finalizer(SQLFunctionCtx *pCtx) { pTrace("no result generated, result is set to NULL"); setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); } - - resetResultInfo(GET_RES_INFO(pCtx)); + + doFinalizer(pCtx); } /* @@ -889,6 +891,7 @@ static void avg_finalizer(SQLFunctionCtx *pCtx) { // cannot set the numOfIteratedElems again since it is set during previous iteration GET_RES_INFO(pCtx)->numOfRes = 1; + doFinalizer(pCtx); } ///////////////////////////////////////////////////////////////////////////////////////////// @@ -1433,8 +1436,8 @@ static void stddev_finalizer(SQLFunctionCtx *pCtx) { *retValue = sqrt(pStd->res / pStd->num); SET_VAL(pCtx, 1, 1); } - - resetResultInfo(GET_RES_INFO(pCtx)); + + doFinalizer(pCtx); } ////////////////////////////////////////////////////////////////////////////////////// @@ -1836,7 +1839,7 @@ static void last_row_finalizer(SQLFunctionCtx *pCtx) { } GET_RES_INFO(pCtx)->numOfRes = 1; - resetResultInfo(GET_RES_INFO(pCtx)); + doFinalizer(pCtx); } ////////////////////////////////////////////////////////////////////////////////// @@ -2404,8 +2407,8 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) { GET_TRUE_DATA_TYPE(); copyTopBotRes(pCtx, type); - - resetResultInfo(pResInfo); + + doFinalizer(pCtx); } /////////////////////////////////////////////////////////////////////////////////////////////// @@ -2481,8 +2484,8 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) { tOrderDescDestroy(pMemBucket->pOrderDesc); tMemBucketDestroy(pMemBucket); - - resetResultInfo(GET_RES_INFO(pCtx)); + + doFinalizer(pCtx); } ////////////////////////////////////////////////////////////////////////////////// @@ -2690,8 +2693,8 @@ static void apercentile_finalizer(SQLFunctionCtx *pCtx) { return; } } - - resetResultInfo(pResInfo); + + doFinalizer(pCtx); } ///////////////////////////////////////////////////////////////////////////////// @@ -2871,7 +2874,7 @@ static void leastsquares_finalizer(SQLFunctionCtx *pCtx) { param[1][2] /= param[1][1]; sprintf(pCtx->aOutputBuf, "(%lf, %lf)", param[0][2], param[1][2]); - resetResultInfo(GET_RES_INFO(pCtx)); + doFinalizer(pCtx); } static void date_col_output_function(SQLFunctionCtx *pCtx) { @@ -2927,18 +2930,17 @@ static void tag_project_function(SQLFunctionCtx *pCtx) { INC_INIT_VAL(pCtx, pCtx->size); assert(pCtx->inputBytes == pCtx->outputBytes); -// int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pCtx->order); for (int32_t i = 0; i < pCtx->size; ++i) { tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->outputType); - pCtx->aOutputBuf += pCtx->outputBytes/* * factor*/; + pCtx->aOutputBuf += pCtx->outputBytes; } } static void tag_project_function_f(SQLFunctionCtx *pCtx, int32_t index) { INC_INIT_VAL(pCtx, 1); tVariantDump(&pCtx->tag, pCtx->aOutputBuf, pCtx->tag.nType); - pCtx->aOutputBuf += pCtx->outputBytes/* * GET_FORWARD_DIRECTION_FACTOR(pCtx->order)*/; + pCtx->aOutputBuf += pCtx->outputBytes; } /** @@ -4183,7 +4185,7 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) { } GET_RES_INFO(pCtx)->numOfRes = 1; - resetResultInfo(GET_RES_INFO(pCtx)); + doFinalizer(pCtx); } /** @@ -4345,7 +4347,7 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) { strcpy(pCtx->aOutputBuf, pTSbuf->path); tsBufDestory(pTSbuf); - resetResultInfo(GET_RES_INFO(pCtx)); + doFinalizer(pCtx); } /* @@ -4385,7 +4387,7 @@ SQLAggFuncElem aAggs[28] = {{ count_function, count_function_f, no_next_step, - noop1, + doFinalizer, count_func_merge, count_func_merge, count_load_data_info, @@ -4628,7 +4630,7 @@ SQLAggFuncElem aAggs[28] = {{ date_col_output_function, date_col_output_function_f, no_next_step, - noop1, + doFinalizer, copy_function, copy_function, no_data_info, @@ -4643,7 +4645,7 @@ SQLAggFuncElem aAggs[28] = {{ noop1, noop2, no_next_step, - noop1, + doFinalizer, copy_function, copy_function, data_req_load_info, @@ -4658,7 +4660,7 @@ SQLAggFuncElem aAggs[28] = {{ tag_function, noop2, no_next_step, - noop1, + doFinalizer, copy_function, copy_function, no_data_info, @@ -4688,7 +4690,7 @@ SQLAggFuncElem aAggs[28] = {{ tag_function, tag_function_f, no_next_step, - noop1, + doFinalizer, copy_function, copy_function, no_data_info, @@ -4703,7 +4705,7 @@ SQLAggFuncElem aAggs[28] = {{ col_project_function, col_project_function_f, no_next_step, - noop1, + doFinalizer, copy_function, copy_function, data_req_load_info, @@ -4718,7 +4720,7 @@ SQLAggFuncElem aAggs[28] = {{ tag_project_function, tag_project_function_f, no_next_step, - noop1, + doFinalizer, copy_function, copy_function, no_data_info, @@ -4733,7 +4735,7 @@ SQLAggFuncElem aAggs[28] = {{ arithmetic_function, arithmetic_function_f, no_next_step, - noop1, + doFinalizer, copy_function, copy_function, data_req_load_info, @@ -4748,7 +4750,7 @@ SQLAggFuncElem aAggs[28] = {{ diff_function, diff_function_f, no_next_step, - noop1, + doFinalizer, noop1, noop1, data_req_load_info, @@ -4794,7 +4796,7 @@ SQLAggFuncElem aAggs[28] = {{ interp_function, do_sum_f, // todo filter handle no_next_step, - noop1, + doFinalizer, noop1, copy_function, no_data_info, diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index 3f46d4fb54..dc86f924aa 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -279,9 +279,11 @@ void vnodePrintQueryStatistics(SMeterQuerySupportObj* pSupporter); void clearGroupResultBuf(SOutputRes* pOneOutputRes, int32_t nOutputCols); void copyGroupResultBuf(SOutputRes* dst, const SOutputRes* src, int32_t nOutputCols); -void resetResWindowInfo(SSlidingWindowResInfo* pWindowResInfo, int32_t numOfCols); -void clearCompletedResWindows(SSlidingWindowResInfo* pWindowResInfo, int32_t numOfCols); -int32_t numOfResFromResWindowInfo(SSlidingWindowResInfo* pWindowResInfo); +void resetSlidingWindowInfo(SSlidingWindowInfo* pSlidingWindowInfo, int32_t numOfCols); +void clearCompletedSlidingWindows(SSlidingWindowInfo* pSlidingWindowInfo, int32_t numOfCols); +int32_t numOfClosedSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo); +void closeSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo, int32_t slot); +void closeAllSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo); #ifdef __cplusplus } diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index eb8fb09bd3..ee88e5e366 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -122,7 +122,7 @@ typedef struct SWindowStatus { bool closed; } SWindowStatus; -typedef struct SSlidingWindowResInfo { +typedef struct SSlidingWindowInfo { SOutputRes* pResult; // reference to SQuerySupporter->pResult SWindowStatus* pStatus; // current query window closed or not? void* hashList; // hash list for quick access @@ -134,7 +134,7 @@ typedef struct SSlidingWindowResInfo { int64_t startTime; // start time of the first time window for sliding query int64_t prevSKey; // previous (not completed) sliding window start key int64_t threshold; // threshold for return completed results. -} SSlidingWindowResInfo; +} SSlidingWindowInfo; typedef struct SQueryRuntimeEnv { SPositionInfo startPos; /* the start position, used for secondary/third iteration */ @@ -159,14 +159,13 @@ typedef struct SQueryRuntimeEnv { SInterpolationInfo interpoInfo; SData** pInterpoBuf; - SSlidingWindowResInfo swindowResInfo; + SSlidingWindowInfo swindowResInfo; STSBuf* pTSBuf; STSCursor cur; SQueryCostSummary summary; - TSKEY intervalSKey; // skey of the complete time window, not affected by the actual data distribution - TSKEY intervalEKey; // ekey of the complete time window + STimeWindow intervalWindow; // the complete time window, not affected by the actual data distribution /* * Temporarily hold the in-memory cache block info during scan cache blocks @@ -296,7 +295,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo* pQInfo, SQuery* pQuery, void* param) void vnodeDecMeterRefcnt(SQInfo* pQInfo); /* sql query handle in dnode */ -void vnodeSingleMeterQuery(SSchedMsg* pMsg); +void vnodeSingleTableQuery(SSchedMsg* pMsg); /* * handle multi-meter query process diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index b0994c02da..046058f062 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -88,7 +88,7 @@ static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId); static void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t pKey, int64_t keyFirst, int64_t keyLast, int64_t *actualSkey, int64_t *actualEkey, int64_t *skey, int64_t *ekey); -static void getNextLogicalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t *skey, int64_t *ekey); +static void getNextLogicalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow* pTimeWindow); // check the offset value integrity static FORCE_INLINE int32_t validateHeaderOffsetSegment(SQInfo *pQInfo, char *filePath, int32_t vid, char *data, @@ -1484,7 +1484,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t } } - TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalSKey : pRuntimeEnv->intervalEKey; + TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalWindow.skey : pRuntimeEnv->intervalWindow.ekey; setExecParams(pQuery, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, forwardStep, functionId, tpField, hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag); } @@ -1574,113 +1574,113 @@ static bool needToLoadDataBlock(SQuery *pQuery, SField *pField, SQLFunctionCtx * return true; } -static SOutputRes *getResWindow(SSlidingWindowResInfo *pWindowResInfo, char *pData, int16_t bytes, +static SOutputRes *doSetSlidingWindowFromKey(SSlidingWindowInfo *pSlidingWindowInfo, char *pData, int16_t bytes, SWindowStatus **pStatus) { int32_t p = -1; - int32_t *p1 = (int32_t *)taosGetDataFromHash(pWindowResInfo->hashList, pData, bytes); + int32_t *p1 = (int32_t *)taosGetDataFromHash(pSlidingWindowInfo->hashList, pData, bytes); if (p1 != NULL) { p = *p1; - pWindowResInfo->curIndex = p; + pSlidingWindowInfo->curIndex = p; if (pStatus != NULL) { - *pStatus = &pWindowResInfo->pStatus[p]; + *pStatus = &pSlidingWindowInfo->pStatus[p]; } } else { // more than the capacity, reallocate the resources - if (pWindowResInfo->size >= pWindowResInfo->capacity) { - int64_t newCap = pWindowResInfo->capacity * 2; + if (pSlidingWindowInfo->size >= pSlidingWindowInfo->capacity) { + int64_t newCap = pSlidingWindowInfo->capacity * 2; - char *t = realloc(pWindowResInfo->pStatus, newCap * sizeof(SWindowStatus)); + char *t = realloc(pSlidingWindowInfo->pStatus, newCap * sizeof(SWindowStatus)); if (t != NULL) { - pWindowResInfo->pStatus = (SWindowStatus *)t; - memset(&pWindowResInfo->pStatus[pWindowResInfo->capacity], 0, sizeof(SWindowStatus) * pWindowResInfo->capacity); + pSlidingWindowInfo->pStatus = (SWindowStatus *)t; + memset(&pSlidingWindowInfo->pStatus[pSlidingWindowInfo->capacity], 0, sizeof(SWindowStatus) * pSlidingWindowInfo->capacity); } else { // todo } - pWindowResInfo->capacity = newCap; + pSlidingWindowInfo->capacity = newCap; } // add a new result set for a new group if (pStatus != NULL) { - *pStatus = &pWindowResInfo->pStatus[pWindowResInfo->size]; + *pStatus = &pSlidingWindowInfo->pStatus[pSlidingWindowInfo->size]; } - p = pWindowResInfo->size; - pWindowResInfo->curIndex = pWindowResInfo->size; + p = pSlidingWindowInfo->size; + pSlidingWindowInfo->curIndex = pSlidingWindowInfo->size; - pWindowResInfo->size += 1; - taosAddToHashTable(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t)); + pSlidingWindowInfo->size += 1; + taosAddToHashTable(pSlidingWindowInfo->hashList, pData, bytes, (char *)&pSlidingWindowInfo->curIndex, sizeof(int32_t)); } - return &pWindowResInfo->pResult[p]; + return &pSlidingWindowInfo->pResult[p]; } -static int32_t initResWindowInfo(SSlidingWindowResInfo *pWindowResInfo, int32_t threshold, int16_t type, +static int32_t initResWindowInfo(SSlidingWindowInfo *pSlidingWindowInfo, int32_t threshold, int16_t type, SOutputRes *pRes) { - pWindowResInfo->capacity = threshold; - pWindowResInfo->threshold = threshold; + pSlidingWindowInfo->capacity = threshold; + pSlidingWindowInfo->threshold = threshold; - pWindowResInfo->type = type; + pSlidingWindowInfo->type = type; _hash_fn_t fn = taosGetDefaultHashFunction(type); - pWindowResInfo->hashList = taosInitHashTable(threshold, fn, false); + pSlidingWindowInfo->hashList = taosInitHashTable(threshold, fn, false); - pWindowResInfo->curIndex = -1; - pWindowResInfo->size = 0; - pWindowResInfo->pResult = pRes; - pWindowResInfo->pStatus = calloc(threshold, sizeof(SWindowStatus)); + pSlidingWindowInfo->curIndex = -1; + pSlidingWindowInfo->size = 0; + pSlidingWindowInfo->pResult = pRes; + pSlidingWindowInfo->pStatus = calloc(threshold, sizeof(SWindowStatus)); - if (pWindowResInfo->pStatus == NULL || pWindowResInfo->hashList == NULL) { + if (pSlidingWindowInfo->pStatus == NULL || pSlidingWindowInfo->hashList == NULL) { return -1; } return TSDB_CODE_SUCCESS; } -static void destroyResWindowInfo(SSlidingWindowResInfo *pWindowResInfo) { - if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) { - assert(pWindowResInfo->hashList == NULL && pWindowResInfo->pResult == NULL); +static void destroyResWindowInfo(SSlidingWindowInfo *pSlidingWindowInfo) { + if (pSlidingWindowInfo == NULL || pSlidingWindowInfo->capacity == 0) { + assert(pSlidingWindowInfo->hashList == NULL && pSlidingWindowInfo->pResult == NULL); return; } - taosCleanUpHashTable(pWindowResInfo->hashList); - tfree(pWindowResInfo->pStatus); + taosCleanUpHashTable(pSlidingWindowInfo->hashList); + tfree(pSlidingWindowInfo->pStatus); } -void resetResWindowInfo(SSlidingWindowResInfo *pWindowResInfo, int32_t numOfCols) { - if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0) { +void resetSlidingWindowInfo(SSlidingWindowInfo *pSlidingWindowInfo, int32_t numOfCols) { + if (pSlidingWindowInfo == NULL || pSlidingWindowInfo->capacity == 0) { return; } - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SOutputRes *pOneRes = &pWindowResInfo->pResult[i]; + for (int32_t i = 0; i < pSlidingWindowInfo->size; ++i) { + SOutputRes *pOneRes = &pSlidingWindowInfo->pResult[i]; clearGroupResultBuf(pOneRes, numOfCols); } - memset(pWindowResInfo->pStatus, 0, sizeof(SWindowStatus) * pWindowResInfo->capacity); + memset(pSlidingWindowInfo->pStatus, 0, sizeof(SWindowStatus) * pSlidingWindowInfo->capacity); - pWindowResInfo->curIndex = -1; - taosCleanUpHashTable(pWindowResInfo->hashList); - pWindowResInfo->size = 0; + pSlidingWindowInfo->curIndex = -1; + taosCleanUpHashTable(pSlidingWindowInfo->hashList); + pSlidingWindowInfo->size = 0; - _hash_fn_t fn = taosGetDefaultHashFunction(pWindowResInfo->type); - pWindowResInfo->hashList = taosInitHashTable(pWindowResInfo->capacity, fn, false); + _hash_fn_t fn = taosGetDefaultHashFunction(pSlidingWindowInfo->type); + pSlidingWindowInfo->hashList = taosInitHashTable(pSlidingWindowInfo->capacity, fn, false); - pWindowResInfo->startTime = 0; - pWindowResInfo->prevSKey = 0; + pSlidingWindowInfo->startTime = 0; + pSlidingWindowInfo->prevSKey = 0; } -void clearCompletedResWindows(SSlidingWindowResInfo *pWindowResInfo, int32_t numOfCols) { - if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) { +void clearCompletedSlidingWindows(SSlidingWindowInfo *pSlidingWindowInfo, int32_t numOfCols) { + if (pSlidingWindowInfo == NULL || pSlidingWindowInfo->capacity == 0 || pSlidingWindowInfo->size == 0) { return; } int32_t i = 0; - for (i = 0; i < pWindowResInfo->size; ++i) { - SWindowStatus *pStatus = &pWindowResInfo->pStatus[i]; + for (i = 0; i < pSlidingWindowInfo->size; ++i) { + SWindowStatus *pStatus = &pSlidingWindowInfo->pStatus[i]; if (pStatus->closed) { // remove the window slot from hash table - taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pStatus->window.skey, TSDB_KEYSIZE); + taosDeleteFromHashTable(pSlidingWindowInfo->hashList, (const char *)&pStatus->window.skey, TSDB_KEYSIZE); } else { break; } @@ -1690,48 +1690,106 @@ void clearCompletedResWindows(SSlidingWindowResInfo *pWindowResInfo, int32_t num return; } - int32_t remain = pWindowResInfo->size - i; + int32_t remain = pSlidingWindowInfo->size - i; //clear remain list - memmove(pWindowResInfo->pStatus, &pWindowResInfo->pStatus[i], remain * sizeof(SWindowStatus)); - memset(&pWindowResInfo->pStatus[remain], 0, (pWindowResInfo->capacity - remain) * sizeof(SWindowStatus)); + memmove(pSlidingWindowInfo->pStatus, &pSlidingWindowInfo->pStatus[i], remain * sizeof(SWindowStatus)); + memset(&pSlidingWindowInfo->pStatus[remain], 0, (pSlidingWindowInfo->capacity - remain) * sizeof(SWindowStatus)); for(int32_t k = 0; k < remain; ++k) { - copyGroupResultBuf(&pWindowResInfo->pResult[k], &pWindowResInfo->pResult[i + k], numOfCols); + copyGroupResultBuf(&pSlidingWindowInfo->pResult[k], &pSlidingWindowInfo->pResult[i + k], numOfCols); } - for(int32_t k = remain; k < pWindowResInfo->size; ++k) { - SOutputRes *pOneRes = &pWindowResInfo->pResult[k]; + for(int32_t k = remain; k < pSlidingWindowInfo->size; ++k) { + SOutputRes *pOneRes = &pSlidingWindowInfo->pResult[k]; clearGroupResultBuf(pOneRes, numOfCols); } - pWindowResInfo->size = remain; + pSlidingWindowInfo->size = remain; - for(int32_t k = 0; k < pWindowResInfo->size; ++k) { - SWindowStatus* pStatus = &pWindowResInfo->pStatus[k]; - int32_t *p = (int32_t*) taosGetDataFromHash(pWindowResInfo->hashList, (const char*)&pStatus->window.skey, TSDB_KEYSIZE); + for(int32_t k = 0; k < pSlidingWindowInfo->size; ++k) { + SWindowStatus* pStatus = &pSlidingWindowInfo->pStatus[k]; + int32_t *p = (int32_t*) taosGetDataFromHash(pSlidingWindowInfo->hashList, (const char*)&pStatus->window.skey, TSDB_KEYSIZE); int32_t v = *p; v = (v - i); - taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pStatus->window.skey, TSDB_KEYSIZE); + taosDeleteFromHashTable(pSlidingWindowInfo->hashList, (const char *)&pStatus->window.skey, TSDB_KEYSIZE); - taosAddToHashTable(pWindowResInfo->hashList, (const char*)&pStatus->window.skey, TSDB_KEYSIZE, + taosAddToHashTable(pSlidingWindowInfo->hashList, (const char*)&pStatus->window.skey, TSDB_KEYSIZE, (char *)&v, sizeof(int32_t)); } - pWindowResInfo->curIndex = -1; + pSlidingWindowInfo->curIndex = -1; } -int32_t numOfResFromResWindowInfo(SSlidingWindowResInfo *pWindowResInfo) { - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SWindowStatus *pStatus = &pWindowResInfo->pStatus[i]; +int32_t numOfClosedSlidingWindow(SSlidingWindowInfo *pSlidingWindowInfo) { + for (int32_t i = 0; i < pSlidingWindowInfo->size; ++i) { + SWindowStatus *pStatus = &pSlidingWindowInfo->pStatus[i]; if (pStatus->closed == false) { return i; } } } -static SWindowStatus* getCurrentSWindow(SSlidingWindowResInfo *pWindowResInfo) { - return &pWindowResInfo->pStatus[pWindowResInfo->curIndex]; +void closeSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo, int32_t slot) { + assert(slot >= 0 && slot < pSlidingWindowInfo->size); + SWindowStatus* pStatus = &pSlidingWindowInfo->pStatus[slot]; + pStatus->closed = true; +} + +void closeAllSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo) { + assert(pSlidingWindowInfo->size >=0 && pSlidingWindowInfo->capacity >= pSlidingWindowInfo->size); + + for(int32_t i = 0; i < pSlidingWindowInfo->size; ++i) { + SWindowStatus* pStatus = &pSlidingWindowInfo->pStatus[i]; + pStatus->closed = true; + } +} + +static SWindowStatus* getSlidingWindowStatus(SSlidingWindowInfo *pSlidingWindowInfo, int32_t slot) { + return &pSlidingWindowInfo->pStatus[slot]; +} + +static bool slidingWindowClosed(SSlidingWindowInfo* pSlidingWindowInfo, int32_t slot) { + return (pSlidingWindowInfo->pStatus[slot].closed == true); +} + +static int32_t curSlidingWindow(SSlidingWindowInfo *pSlidingWindowInfo) { + assert(pSlidingWindowInfo->curIndex >= 0 && pSlidingWindowInfo->curIndex < pSlidingWindowInfo->size); + + return pSlidingWindowInfo->curIndex; +} + +// get the correct sliding window according to the handled timestamp +static STimeWindow getActiveSlidingWindow(SSlidingWindowInfo* pSlidingWindowInfo, int64_t ts, SQuery* pQuery) { + STimeWindow w = {0}; + + if (pSlidingWindowInfo->curIndex == -1) { // the first window, from the prevous stored value + w.skey = pSlidingWindowInfo->prevSKey; + w.ekey = w.skey + pQuery->nAggTimeInterval - 1; + + } else { + SWindowStatus* pStatus = getSlidingWindowStatus(pSlidingWindowInfo, curSlidingWindow(pSlidingWindowInfo)); + + if (pStatus->window.skey <= ts && pStatus->window.ekey >= ts) { + w = pStatus->window; + } else { + int64_t st = pStatus->window.skey; + + while (st > ts) { + st -= pQuery->slidingTime; + } + + while ((st + pQuery->nAggTimeInterval - 1) < ts) { + st += pQuery->slidingTime; + } + + w.skey = st; + w.ekey = w.skey + pQuery->nAggTimeInterval - 1; + } + } + + assert(ts >= w.skey && ts <= w.ekey); + return w; } static int32_t setGroupResultFromKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes) { @@ -1739,7 +1797,7 @@ static int32_t setGroupResultFromKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData, return -1; } - SOutputRes *pOutputRes = getResWindow(&pRuntimeEnv->swindowResInfo, pData, bytes, NULL); + SOutputRes *pOutputRes = doSetSlidingWindowFromKey(&pRuntimeEnv->swindowResInfo, pData, bytes, NULL); if (pOutputRes == NULL) { return -1; } @@ -1750,17 +1808,20 @@ static int32_t setGroupResultFromKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData, return TSDB_CODE_SUCCESS; } -static int32_t setSlidingWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, int64_t skey, int64_t ekey) { - int64_t st = skey; - +static int32_t setSlidingWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pTimeWindow) { + assert(pTimeWindow->skey < pTimeWindow->ekey); + + int64_t st = pTimeWindow->skey; + SWindowStatus *pStatus = NULL; - SOutputRes * pOutputRes = getResWindow(&pRuntimeEnv->swindowResInfo, (char *)&st, TSDB_KEYSIZE, &pStatus); + SOutputRes* pOutputRes = doSetSlidingWindowFromKey(&pRuntimeEnv->swindowResInfo, (char *)&st, TSDB_KEYSIZE, + &pStatus); + if (pOutputRes == NULL) { return -1; } - pStatus->window = (STimeWindow){.skey = skey, .ekey = ekey}; - + pStatus->window = *pTimeWindow; setGroupOutputBuffer(pRuntimeEnv, pOutputRes); initCtxOutputBuf(pRuntimeEnv); @@ -1884,7 +1945,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * bool hasNull = hasNullVal(pQuery, k, pBlockInfo, pFields, isDiskFileBlock); char *dataBlock = getDataBlocks(pRuntimeEnv, &sasArray[k], k, *forwardStep); - TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalSKey : pRuntimeEnv->intervalEKey; + TSKEY ts = QUERY_IS_ASC_QUERY(pQuery) ? pRuntimeEnv->intervalWindow.skey : pRuntimeEnv->intervalWindow.ekey; setExecParams(pQuery, &pCtx[k], ts, dataBlock, (char *)primaryKeyCol, (*forwardStep), functionId, pFields, hasNull, pRuntimeEnv->blockStatus, &sasArray[k], pRuntimeEnv->scanFlag); } @@ -1936,34 +1997,11 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * if (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0) { // decide the time window according to the primary timestamp int64_t ts = primaryKeyCol[offset]; - int64_t wskey = 0; - int64_t wekey = 0; - - if (pRuntimeEnv->swindowResInfo.curIndex == -1) { - wskey = pRuntimeEnv->swindowResInfo.prevSKey; - wekey = wskey + pQuery->nAggTimeInterval - 1; - } else { - SWindowStatus *pStatus = &pRuntimeEnv->swindowResInfo.pStatus[pRuntimeEnv->swindowResInfo.curIndex]; - if (pStatus->window.skey <= ts && pStatus->window.ekey >= ts) { - wskey = pStatus->window.skey; - wekey = pStatus->window.ekey; - } else { - int64_t st = pStatus->window.skey; - - while (st > ts) { - st -= pQuery->slidingTime; - } - - while ((st + pQuery->nAggTimeInterval - 1) < ts) { - st += pQuery->slidingTime; - } - - wskey = st; - wekey = wskey + pQuery->nAggTimeInterval - 1; - } - } + + SSlidingWindowInfo* pSlidingWindowInfo = &pRuntimeEnv->swindowResInfo; + STimeWindow win = getActiveSlidingWindow(pSlidingWindowInfo, ts, pQuery); - int32_t ret = setSlidingWindowFromKey(pRuntimeEnv, wskey, wekey); + int32_t ret = setSlidingWindowFromKey(pRuntimeEnv, &win); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code continue; } @@ -1973,9 +2011,10 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; - pCtx[k].nStartQueryTimestamp = wskey; + pCtx[k].nStartQueryTimestamp = win.skey; + + SWindowStatus* pStatus = getSlidingWindowStatus(pSlidingWindowInfo, curSlidingWindow(pSlidingWindowInfo)); - SWindowStatus* pStatus = getCurrentSWindow(&pRuntimeEnv->swindowResInfo); if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) { // qTrace("QInfo:%p not completed in supplementary scan, ignore funcId:%d, window:%lld-%lld", // GET_QINFO_ADDR(pQuery), functionId, pStatus->window.skey, pStatus->window.ekey); @@ -1990,25 +2029,27 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * lastKey = ts; int32_t index = pRuntimeEnv->swindowResInfo.curIndex; while (1) { - getNextLogicalQueryRange(pRuntimeEnv, &wskey, &wekey); - if (pRuntimeEnv->swindowResInfo.startTime > wskey || (wskey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (wskey > pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) { + STimeWindow nextWin = {0}; + + getNextLogicalQueryRange(pRuntimeEnv, &nextWin); + if (pSlidingWindowInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (nextWin.skey > pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) { pRuntimeEnv->swindowResInfo.curIndex = index; break; } - if (ts >= wskey && ts <= wekey) { + if (ts >= nextWin.skey && ts <= nextWin.ekey) { // null data, failed to allocate more memory buffer - if (setSlidingWindowFromKey(pRuntimeEnv, wskey, wekey) != TSDB_CODE_SUCCESS) { + if (setSlidingWindowFromKey(pRuntimeEnv, &nextWin) != TSDB_CODE_SUCCESS) { pRuntimeEnv->swindowResInfo.curIndex = index; break; } for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) { int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId; - pCtx[k].nStartQueryTimestamp = wskey; + pCtx[k].nStartQueryTimestamp = nextWin.skey; - SWindowStatus* pStatus = getCurrentSWindow(&pRuntimeEnv->swindowResInfo); + SWindowStatus* pStatus = getSlidingWindowStatus(pSlidingWindowInfo, curSlidingWindow(pSlidingWindowInfo)); if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) { // qTrace("QInfo:%p not completed in supplementary scan, ignore funcId:%d, window:%lld-%lld", // GET_QINFO_ADDR(pQuery), functionId, pStatus->window.skey, pStatus->window.ekey); @@ -2068,42 +2109,39 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * free(sasArray); if (pQuery->slidingTime > 0 && pQuery->nAggTimeInterval > 0 && IS_MASTER_SCAN(pRuntimeEnv)) { - SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo; + SSlidingWindowInfo *pSlidingWindowInfo = &pRuntimeEnv->swindowResInfo; // query completed if (lastKey >= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery) || (lastKey <= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SWindowStatus *pStatus = &pWindowResInfo->pStatus[i]; - pStatus->closed = true; - } - - pWindowResInfo->curIndex = pWindowResInfo->size - 1; + closeAllSlidingWindow(pSlidingWindowInfo); + + pSlidingWindowInfo->curIndex = pSlidingWindowInfo->size - 1; setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL); } else { int32_t i = 0; int64_t skey = 0; - for (i = 0; i < pWindowResInfo->size; ++i) { - SWindowStatus *pStatus = &pWindowResInfo->pStatus[i]; + for (i = 0; i < pSlidingWindowInfo->size; ++i) { + SWindowStatus *pStatus = &pSlidingWindowInfo->pStatus[i]; if ((pStatus->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) || (pStatus->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) { - pStatus->closed = true; + closeSlidingWindow(pSlidingWindowInfo, i); } else { skey = pStatus->window.skey; break; } } - pWindowResInfo->prevSKey = skey; + pSlidingWindowInfo->prevSKey = skey; // the number of completed slots are larger than the threshold, dump to client immediately. - int32_t v = numOfResFromResWindowInfo(pWindowResInfo); - if (v > pWindowResInfo->threshold) { + int32_t v = numOfClosedSlidingWindow(pSlidingWindowInfo); + if (v > pSlidingWindowInfo->threshold) { setQueryStatus(pQuery, QUERY_RESBUF_FULL); } - dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, v); + dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pSlidingWindowInfo->size, v); } } @@ -2651,11 +2689,6 @@ bool isFixedOutputQuery(SQuery *pQuery) { continue; } - // // ignore the group by + projection combination - // if (pExprMsg->functionId == TSDB_FUNC_PRJ && isGroupbyNormalCol(pQuery)) { - // continue; - // } - if (!IS_MULTIOUTPUT(aAggs[pExprMsg->functionId].nStatus)) { return true; } @@ -3049,20 +3082,18 @@ static void getAlignedIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY ke pQuery->skey = skey1; pQuery->ekey = ekey1; - pRuntimeEnv->intervalSKey = windowSKey; - pRuntimeEnv->intervalEKey = windowEKey; + pRuntimeEnv->intervalWindow = (STimeWindow) {.skey = windowSKey, .ekey = windowEKey}; assert(pQuery->skey <= pQuery->ekey && - pRuntimeEnv->intervalSKey + (pQuery->nAggTimeInterval - 1) == pRuntimeEnv->intervalEKey); + pRuntimeEnv->intervalWindow.skey + (pQuery->nAggTimeInterval - 1) == pRuntimeEnv->intervalWindow.ekey); } else { pQuery->skey = ekey1; pQuery->ekey = skey1; - - pRuntimeEnv->intervalSKey = windowEKey; - pRuntimeEnv->intervalEKey = windowSKey; - + + pRuntimeEnv->intervalWindow = (STimeWindow) {.skey = windowEKey, .ekey = windowSKey}; + assert(pQuery->skey >= pQuery->ekey && - pRuntimeEnv->intervalSKey - (pQuery->nAggTimeInterval - 1) == pRuntimeEnv->intervalEKey); + pRuntimeEnv->intervalWindow.skey - (pQuery->nAggTimeInterval - 1) == pRuntimeEnv->intervalWindow.ekey); } pQuery->lastKey = pQuery->skey; @@ -4885,13 +4916,13 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl } } -static void getNextLogicalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, int64_t *skey, int64_t *ekey) { +static void getNextLogicalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow* pTimeWindow) { SQuery *pQuery = pRuntimeEnv->pQuery; int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - - *skey += (pQuery->slidingTime * factor); - *ekey += (pQuery->slidingTime * factor); + + pTimeWindow->skey += (pQuery->slidingTime * factor); + pTimeWindow->ekey += (pQuery->slidingTime * factor); } static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { @@ -5606,15 +5637,15 @@ void disableFunctForSuppleScan(SQueryRuntimeEnv *pRuntimeEnv, int32_t order) { pRuntimeEnv->pCtx[i].order = (pRuntimeEnv->pCtx[i].order) ^ 1; } - SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo; + SSlidingWindowInfo *pSlidingWindowInfo = &pRuntimeEnv->swindowResInfo; - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SWindowStatus *pStatus = &pWindowResInfo->pStatus[i]; + for (int32_t i = 0; i < pSlidingWindowInfo->size; ++i) { + SWindowStatus *pStatus = &pSlidingWindowInfo->pStatus[i]; if (!pStatus->closed) { continue; } - SOutputRes *buf = &pWindowResInfo->pResult[i]; + SOutputRes *buf = &pSlidingWindowInfo->pResult[i]; // open/close the specified query for each group result for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { @@ -5748,7 +5779,6 @@ void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) { void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) { SQuery *pQuery = pRuntimeEnv->pQuery; -// int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); // reset the execution contexts for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { @@ -5860,13 +5890,13 @@ static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus pQuery->lastKey = pQuery->skey; pRuntimeEnv->startPos = pRuntimeEnv->endPos; - SWAP(pRuntimeEnv->intervalSKey, pRuntimeEnv->intervalEKey, TSKEY); + SWAP(pRuntimeEnv->intervalWindow.skey, pRuntimeEnv->intervalWindow.ekey, TSKEY); } static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus) { SQuery *pQuery = pRuntimeEnv->pQuery; SWAP(pQuery->skey, pQuery->ekey, TSKEY); - SWAP(pRuntimeEnv->intervalSKey, pRuntimeEnv->intervalEKey, TSKEY); + SWAP(pRuntimeEnv->intervalWindow.skey, pRuntimeEnv->intervalWindow.ekey, TSKEY); pQuery->lastKey = pStatus->lastKey; pQuery->skey = pStatus->skey; @@ -5940,12 +5970,12 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { // for each group result, call the finalize function for each column - SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo; + SSlidingWindowInfo *pSlidingWindowInfo = &pRuntimeEnv->swindowResInfo; - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SOutputRes *buf = &pWindowResInfo->pResult[i]; + for (int32_t i = 0; i < pSlidingWindowInfo->size; ++i) { + SOutputRes *buf = &pSlidingWindowInfo->pResult[i]; - SWindowStatus *pStatus = &pWindowResInfo->pStatus[i]; + SWindowStatus *pStatus = &pSlidingWindowInfo->pStatus[i]; if (!pStatus->closed) { continue; } @@ -6002,17 +6032,14 @@ void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { // for each group result, call the finalize function for each column - SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo; - bool groupbyCol = isGroupbyNormalCol(pQuery->pGroupbyExpr); + SSlidingWindowInfo *pSlidingWindowInfo = &pRuntimeEnv->swindowResInfo; + if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + closeAllSlidingWindow(pSlidingWindowInfo); + } - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SOutputRes *buf = &pWindowResInfo->pResult[i]; - SWindowStatus* pStatus = &pWindowResInfo->pStatus[i]; - if (groupbyCol) { - pStatus->closed = true; - } - - if (!pStatus->closed) { + for (int32_t i = 0; i < pSlidingWindowInfo->size; ++i) { + SOutputRes *buf = &pSlidingWindowInfo->pResult[i]; + if (!slidingWindowClosed(pSlidingWindowInfo, i)) { continue; } @@ -6079,12 +6106,9 @@ static int32_t getNextIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQue SQuery *pQuery = pRuntimeEnv->pQuery; int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); - printf("------------------------%lld, %lld\n", pQuery->skey, pQuery->ekey); - - *skey = pRuntimeEnv->intervalSKey + (pQuery->slidingTime * factor); - *ekey = pRuntimeEnv->intervalEKey + (pQuery->slidingTime * factor); - printf("new window:%lld, %lld\n", *skey, *ekey); + *skey = pRuntimeEnv->intervalWindow.skey + (pQuery->slidingTime * factor); + *ekey = pRuntimeEnv->intervalWindow.ekey + (pQuery->slidingTime * factor); if (pQuery->slidingTime > 0) { if (QUERY_IS_ASC_QUERY(pQuery)) { @@ -6095,8 +6119,6 @@ static int32_t getNextIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQue if (*skey > pSupporter->rawEKey) { return QUERY_COMPLETED; - // setQueryStatus(pQuery, QUERY_COMPLETED); - // return; } if (*ekey > pSupporter->rawEKey) { @@ -6108,7 +6130,6 @@ static int32_t getNextIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQue } if (*skey < pSupporter->rawEKey) { - // setQueryStatus(pQuery, QUERY_COMPLETED); return QUERY_COMPLETED; } @@ -6143,7 +6164,7 @@ void forwardIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQueryRuntimeE return; } - getNextLogicalQueryRange(pRuntimeEnv, &pRuntimeEnv->intervalSKey, &pRuntimeEnv->intervalEKey); + getNextLogicalQueryRange(pRuntimeEnv, &pRuntimeEnv->intervalWindow); /* ensure the search in cache will return right position */ pQuery->lastKey = pQuery->skey; @@ -7472,7 +7493,7 @@ static int32_t getSubsetNumber(SMeterQuerySupportObj *pSupporter) { int32_t totalSubset = 0; if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { - totalSubset = numOfResFromResWindowInfo(&pSupporter->runtimeEnv.swindowResInfo); + totalSubset = numOfClosedSlidingWindow(&pSupporter->runtimeEnv.swindowResInfo); } else { totalSubset = pSupporter->pSidSet->numOfSubSet; } diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 73cb456f0b..ebfc246f54 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -88,22 +88,10 @@ static void setStartPositionForCacheBlock(SQuery *pQuery, SCacheBlock *pBlock, b static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) { SQuery* pQuery = pRuntimeEnv->pQuery; - // enable execution for next table - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->nAggTimeInterval > 0 && pQuery->slidingTime > 0)) { - SSlidingWindowResInfo *pWindowResInfo = &pRuntimeEnv->swindowResInfo; - - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SOutputRes *buf = &pWindowResInfo->pResult[i]; - for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { - buf->resultInfo[j].complete = false; - } - } - } else { - for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { - SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[i]); - if (pResInfo != NULL) { - pResInfo->complete = false; - } + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + SResultInfo *pResInfo = GET_RES_INFO(&pRuntimeEnv->pCtx[i]); + if (pResInfo != NULL) { + pResInfo->complete = false; } } } @@ -535,6 +523,7 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool * } } + initCtxOutputBuf(pRuntimeEnv); return true; } @@ -572,13 +561,8 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start vnodeScanAllData(pRuntimeEnv); - // enable execution for next table - enableExecutionForNextTable(pRuntimeEnv); - // first/last_row query, do not invoke the finalize for super table query - if (!isFirstLastRowQuery(pQuery)) { - doFinalizeResult(pRuntimeEnv); - } + doFinalizeResult(pRuntimeEnv); int64_t numOfRes = getNumOfResult(pRuntimeEnv); assert(numOfRes == 1 || numOfRes == 0); @@ -592,7 +576,14 @@ static int64_t doCheckMetersInGroup(SQInfo *pQInfo, int32_t index, int32_t start return numOfRes; } -static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { +/** + * super table query handler + * 1. super table projection query, group-by on normal columns query, ts-comp query + * 2. point interpolation query, last row query + * + * @param pQInfo + */ +static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { SMeterQuerySupportObj *pSupporter = pQInfo->pMeterQuerySupporter; SMeterSidExtInfo **pMeterSidExtInfo = pSupporter->pMeterSidExtInfo; @@ -601,8 +592,8 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { SQuery * pQuery = &pQInfo->query; tSidSet *pSids = pSupporter->pSidSet; - SMeterObj *pOneMeter = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid); - + int32_t vid = getMeterObj(pSupporter->pMetersHashTable, pMeterSidExtInfo[0]->sid)->vnode; + resetCtxOutputBuf(pRuntimeEnv); if (isPointInterpoQuery(pQuery)) { @@ -613,7 +604,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { int32_t end = pSids->starterPos[pSupporter->subgroupIdx + 1] - 1; if (isFirstLastRowQuery(pQuery)) { - dTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, pOneMeter->vnode, + dTrace("QInfo:%p last_row query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pSids->numOfSubSet, pSupporter->subgroupIdx); TSKEY key = -1; @@ -646,7 +637,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { int64_t num = doCheckMetersInGroup(pQInfo, index, start); assert(num >= 0); } else { - dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, pOneMeter->vnode, + dTrace("QInfo:%p interp query on vid:%d, numOfGroups:%d, current group:%d", pQInfo, vid, pSids->numOfSubSet, pSupporter->subgroupIdx); for (int32_t k = start; k <= end; ++k) { @@ -673,7 +664,9 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { } } } else { - // this procedure treats all tables as single group + /* + * 1. super table projection query, 2. group-by on normal columns query, 3. ts-comp query + */ assert(pSupporter->meterIdx >= 0); /* @@ -693,9 +686,9 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { return; } - resetResWindowInfo(&pRuntimeEnv->swindowResInfo, pQuery->numOfOutputCols); + resetSlidingWindowInfo(&pRuntimeEnv->swindowResInfo, pQuery->numOfOutputCols); - while (pSupporter->meterIdx < pSupporter->numOfMeters) { + while (pSupporter->meterIdx < pSupporter->numOfMeters) { int32_t k = pSupporter->meterIdx; if (isQueryKilled(pQuery)) { @@ -752,7 +745,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { break; } - // enable execution for next table + // enable execution for next table, when handling the projection query enableExecutionForNextTable(pRuntimeEnv); if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { @@ -772,8 +765,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { break; } - } else { - // forward query range + } else { // forward query range pQuery->skey = pQuery->lastKey; // all data in the result buffer are skipped due to the offset, continue to retrieve data from current meter @@ -789,7 +781,18 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { } } - if (!isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isFirstLastRowQuery(pQuery)) { + /* + * 1. super table projection query, group-by on normal columns query, ts-comp query + * 2. point interpolation query, last row query + * + * group-by on normal columns query and last_row query do NOT invoke the finalizer here, + * since the finalize stage will be done at the client side. + * + * projection query, point interpolation query do not need the finalizer. + * + * Only the ts-comp query requires the finalizer function to be executed here. + */ + if (isTSCompQuery(pQuery)) { doFinalizeResult(pRuntimeEnv); } @@ -799,11 +802,11 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { // todo refactor if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { - SSlidingWindowResInfo* pWindowResInfo = &pRuntimeEnv->swindowResInfo; + SSlidingWindowInfo* pSlidingWindowInfo = &pRuntimeEnv->swindowResInfo; - for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - SOutputRes *buf = &pWindowResInfo->pResult[i]; - pWindowResInfo->pStatus[i].closed = true; // enable return all results for group by normal columns + for (int32_t i = 0; i < pSlidingWindowInfo->size; ++i) { + SOutputRes *buf = &pSlidingWindowInfo->pResult[i]; + pSlidingWindowInfo->pStatus[i].closed = true; // enable return all results for group by normal columns for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { buf->numOfRows = MAX(buf->numOfRows, buf->resultInfo[j].numOfRes); @@ -812,7 +815,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { pQInfo->pMeterQuerySupporter->subgroupIdx = 0; pQuery->pointsRead = 0; - copyFromGroupBuf(pQInfo, pWindowResInfo->pResult); + copyFromGroupBuf(pQInfo, pSlidingWindowInfo->pResult); } pQInfo->pointsRead += pQuery->pointsRead; @@ -821,7 +824,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) { dTrace( "QInfo %p vid:%d, numOfMeters:%d, index:%d, numOfGroups:%d, %d points returned, totalRead:%d totalReturn:%d," "next skey:%" PRId64 ", offset:%" PRId64, - pQInfo, pOneMeter->vnode, pSids->numOfSids, pSupporter->meterIdx, pSids->numOfSubSet, pQuery->pointsRead, + pQInfo, vid, pSids->numOfSids, pSupporter->meterIdx, pSids->numOfSubSet, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned, pQuery->skey, pQuery->limit.offset); } @@ -979,7 +982,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { * select count(*)/top(field,k)/avg(field name) from table_name [where ts>now-1a]; * select count(*) from table_name group by status_column; */ -static void vnodeSingleMeterFixedOutputProcessor(SQInfo *pQInfo) { +static void vnodeSingleTableFixedOutputProcessor(SQInfo *pQInfo) { SQuery * pQuery = &pQInfo->query; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->pMeterQuerySupporter->runtimeEnv; @@ -1002,19 +1005,13 @@ static void vnodeSingleMeterFixedOutputProcessor(SQInfo *pQInfo) { assert(isTopBottomQuery(pQuery)); } - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { - pQInfo->pMeterQuerySupporter->subgroupIdx = 0; - pQuery->pointsRead = 0; - copyFromGroupBuf(pQInfo, pRuntimeEnv->swindowResInfo.pResult); - } - doSkipResults(pRuntimeEnv); doRevisedResultsByLimit(pQInfo); pQInfo->pointsRead = pQuery->pointsRead; } -static void vnodeSingleMeterMultiOutputProcessor(SQInfo *pQInfo) { +static void vnodeSingleTableMultiOutputProcessor(SQInfo *pQInfo) { SQuery * pQuery = &pQInfo->query; SMeterObj *pMeterObj = pQInfo->pObj; @@ -1083,7 +1080,7 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter (pQuery->skey >= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))); initCtxOutputBuf(pRuntimeEnv); - clearCompletedResWindows(&pRuntimeEnv->swindowResInfo, pQuery->numOfOutputCols); + clearCompletedSlidingWindows(&pRuntimeEnv->swindowResInfo, pQuery->numOfOutputCols); vnodeScanAllData(pRuntimeEnv); if (isQueryKilled(pQuery)) { @@ -1133,7 +1130,7 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter } /* handle time interval query on single table */ -static void vnodeSingleMeterIntervalProcessor(SQInfo *pQInfo) { +static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) { SQuery * pQuery = &(pQInfo->query); SMeterObj *pMeterObj = pQInfo->pObj; @@ -1187,7 +1184,7 @@ static void vnodeSingleMeterIntervalProcessor(SQInfo *pQInfo) { pQInfo->pointsRead - pQInfo->pointsInterpo, pQInfo->pointsInterpo, pQInfo->pointsReturned); } -void vnodeSingleMeterQuery(SSchedMsg *pMsg) { +void vnodeSingleTableQuery(SSchedMsg *pMsg) { SQInfo *pQInfo = (SQInfo *)pMsg->ahandle; if (pQInfo == NULL || pQInfo->pMeterQuerySupporter == NULL) { @@ -1280,16 +1277,17 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { int64_t st = taosGetTimestampUs(); - if (pQuery->nAggTimeInterval != 0) { // interval (down sampling operation) + // group by normal column, sliding window query, interval query are handled by interval query processor + if (pQuery->nAggTimeInterval != 0 || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // interval (down sampling operation) assert(pQuery->checkBufferInLoop == 0 && pQuery->pointsOffset == pQuery->pointsToRead); - vnodeSingleMeterIntervalProcessor(pQInfo); + vnodeSingleTableIntervalProcessor(pQInfo); } else { if (isFixedOutputQuery(pQuery)) { assert(pQuery->checkBufferInLoop == 0); - vnodeSingleMeterFixedOutputProcessor(pQInfo); + vnodeSingleTableFixedOutputProcessor(pQInfo); } else { // diff/add/multiply/subtract/division assert(pQuery->checkBufferInLoop == 1); - vnodeSingleMeterMultiOutputProcessor(pQInfo); + vnodeSingleTableMultiOutputProcessor(pQInfo); } } @@ -1336,7 +1334,7 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { assert((pQuery->checkBufferInLoop == 1 && pQuery->nAggTimeInterval == 0) || isPointInterpoQuery(pQuery) || isGroupbyNormalCol(pQuery->pGroupbyExpr)); - vnodeMultiMeterMultiOutputProcessor(pQInfo); + vnodeSTableSeqProcessor(pQInfo); } /* record the total elapsed time */ diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index cc6da83671..c7658ca9ac 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -673,7 +673,7 @@ void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE return pQInfo; } - schedMsg.fp = vnodeSingleMeterQuery; + schedMsg.fp = vnodeSingleTableQuery; } /* @@ -891,7 +891,7 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) { if (pQInfo->pMeterQuerySupporter != NULL) { if (pQInfo->pMeterQuerySupporter->pSidSet == NULL) { - schedMsg.fp = vnodeSingleMeterQuery; + schedMsg.fp = vnodeSingleTableQuery; } else { // group by tag schedMsg.fp = vnodeMultiMeterQuery; } -- GitLab