diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 8fd9b27d3fdebbd42d29bde60e611c318311504b..851102b8e383d35b7dcc914450d934f433a8f4a9 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -714,12 +714,13 @@ static int32_t last_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY return BLK_DATA_NO_NEEDED; } - SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes); - if (pInfo->hasResult != DATA_SET_FLAG) { - return BLK_DATA_ALL_NEEDED; - } else { - return (pInfo->ts > end) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; - } + return BLK_DATA_ALL_NEEDED; +// SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes); +// if (pInfo->hasResult != DATA_SET_FLAG) { +// return BLK_DATA_ALL_NEEDED; +// } else { +// return (pInfo->ts > end) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED; +// } } ////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/src/inc/tsqlfunction.h b/src/inc/tsqlfunction.h index 93f50cf4f3862e7d2747c10399858c3be0562072..b3f095507f3f3b649d4181ee6c7326206c252872 100644 --- a/src/inc/tsqlfunction.h +++ b/src/inc/tsqlfunction.h @@ -100,10 +100,11 @@ extern "C" { #define TOP_BOTTOM_QUERY_LIMIT 100 enum { - MASTER_SCAN = 0x0, - SUPPLEMENTARY_SCAN = 0x1, - FIRST_STAGE_MERGE = 0x10, - SECONDARY_STAGE_MERGE = 0x20, + MASTER_SCAN = 0x0u, + SUPPLEMENTARY_SCAN = 0x1u, + REPEAT_SCAN = 0x2u, //repeat scan belongs to the master scan + FIRST_STAGE_MERGE = 0x10u, + SECONDARY_STAGE_MERGE = 0x20u, }; #define QUERY_IS_STABLE_QUERY(type) (((type)&TSDB_QUERY_TYPE_STABLE_QUERY) != 0) @@ -164,7 +165,7 @@ typedef struct SQLFunctionCtx { int32_t startOffset; int32_t size; // number of rows int32_t order; // asc|desc - int32_t scanFlag; // TODO merge with currentStage + uint32_t scanFlag; // TODO merge with currentStage int16_t inputType; int16_t inputBytes; diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index efad07ac24c2ece25ffe49167b09e24011b5081a..6df52479c012757e766d4f811c4d65458a3ee32a 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -111,8 +111,8 @@ typedef enum { DISK_DATA_DISCARDED = 0x01, } vnodeDiskLoadStatus; -#define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN) -#define IS_SUPPLEMENT_SCAN(runtime) (!IS_MASTER_SCAN(runtime)) +#define IS_MASTER_SCAN(runtime) (((runtime)->scanFlag & 1u) == MASTER_SCAN) +#define IS_SUPPLEMENT_SCAN(runtime) ((runtime)->scanFlag == SUPPLEMENTARY_SCAN) #define SET_SUPPLEMENT_SCAN_FLAG(runtime) ((runtime)->scanFlag = SUPPLEMENTARY_SCAN) #define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN) diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 0d6ba7ef392d77ef08ef407a7e86c5bf3365dc46..dc2664ee559f6b65f18f2f6228b1debd6c11fdb2 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -2138,19 +2138,30 @@ void closeAllTimeWindow(SWindowResInfo *pWindowResInfo) { } } -static int32_t setGroupResultFromKey(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes) { +static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pData, int16_t type, int16_t bytes) { if (isNull(pData, type)) { // ignore the null value return -1; } - + + int32_t GROUPRESULTID = 1; + + SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; + SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes); if (pWindowRes == NULL) { return -1; } + // not assign result buffer yet, add new result buffer + if (pWindowRes->pos.pageId == -1) { + int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, GROUPRESULTID, pRuntimeEnv->numOfRowsPerPage); + if (ret != 0) { + return -1; + } + } + setWindowResOutputBuf(pRuntimeEnv, pWindowRes); initCtxOutputBuf(pRuntimeEnv); - return TSDB_CODE_SUCCESS; } @@ -2368,7 +2379,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * if (groupbyStateValue) { char *stateVal = groupbyColumnData + bytes * offset; - int32_t ret = setGroupResultFromKey(pRuntimeEnv, stateVal, type, bytes); + int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, stateVal, type, bytes); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code continue; } @@ -2495,7 +2506,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockI } TSKEY lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst; - doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); + doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); //todo refactor merge // interval query with limit applied if (isIntervalQuery(pQuery) && pQuery->limit.limit > 0 && @@ -4815,12 +4826,6 @@ int32_t vnodeSTableQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) { } initWindowResInfo(&pRuntimeEnv->windowResInfo, pRuntimeEnv, 512, 4096, type); - // } - // } else { - // ret = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, pSupporter->numOfMeters, pQuery->rowSize); - // if (ret != TSDB_CODE_SUCCESS) { - // return ret; - // } } pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, true); @@ -5702,7 +5707,7 @@ int32_t doMergeMetersResultsToGroupRes(STableQuerySupportObj *pSupporter, SQuery char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes); TSKEY ts = GET_INT64_VAL(b); - assert(ts > 0 && ts == pWindowRes->window.skey); + assert(ts == pWindowRes->window.skey); int64_t num = getNumOfResultWindowRes(pRuntimeEnv, pWindowRes); if (num <= 0) { cs.position[pos] += 1; @@ -6243,7 +6248,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) { void doFinalizeResult(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery) && pQuery->slidingTime > 0)) { + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) { // for each group result, call the finalize function for each column SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { @@ -6314,48 +6319,6 @@ int64_t getNumOfResult(SQueryRuntimeEnv *pRuntimeEnv) { return maxOutput; } -/* - * forward the query range for next interval query - */ -// void forwardIntervalQueryRange(STableQuerySupportObj *pSupporter, SQueryRuntimeEnv *pRuntimeEnv) { -// SQuery *pQuery = pRuntimeEnv->pQuery; -// if (pQuery->slidingTime > 0 && isIntervalQuery(pQuery)) { -// if ((QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey >= pQuery->ekey) || -// (!QUERY_IS_ASC_QUERY(pQuery) && pQuery->lastKey <= pQuery->ekey)) { -// setQueryStatus(pQuery, QUERY_COMPLETED); -// } else { -// /*TSKEY nextTimestamp =*/loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); -// } -// -// return; -// } -// -// // int32_t r = getNextIntervalQueryRange(pSupporter, pRuntimeEnv, &pQuery->skey, &pQuery->ekey); -// // if (r == QUERY_COMPLETED) { -// // setQueryStatus(pQuery, QUERY_COMPLETED); -// // return; -// // } -// // -// // getNextTimeWindow(pRuntimeEnv, &pRuntimeEnv->intervalWindow); -// // -// // /* ensure the search in cache will return right position */ -// // pQuery->lastKey = pQuery->skey; -// // -// // TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos); -// // if ((nextTimestamp > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) || -// // (nextTimestamp < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery)) || -// // Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) { -// // setQueryStatus(pQuery, QUERY_COMPLETED); -// // return; -// // } -// // -// // // bridge the gap in group by time function -// // if ((nextTimestamp > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || -// // (nextTimestamp < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { -// // getAlignedIntervalQueryRange(pRuntimeEnv, nextTimestamp, pSupporter->rawSKey, pSupporter->rawEKey); -// // } -//} - static int32_t offsetComparator(const void *pLeft, const void *pRight) { SMeterDataInfo **pLeft1 = (SMeterDataInfo **)pLeft; SMeterDataInfo **pRight1 = (SMeterDataInfo **)pRight; @@ -7072,7 +7035,7 @@ void setIntervalQueryRange(SMeterQueryInfo *pMeterQueryInfo, STableQuerySupportO SWindowResInfo *pWindowResInfo = &pMeterQueryInfo->windowResInfo; doGetAlignedIntervalQueryRangeImpl(pQuery, win.skey, win.skey, win.ekey, &skey1, &ekey1, &windowSKey, &windowEKey); - pWindowResInfo->startTime = windowSKey; + pWindowResInfo->startTime = windowSKey; // windowSKey may be 0 in case of 1970 timestamp // assert(pWindowResInfo->startTime > 0); if (pWindowResInfo->prevSKey == 0) { diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index deccc1d16129177964d15b1f1fd83aab230dada9..c52a27713e72efebeb75856ed80a0f5a3e309391 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -1174,10 +1174,11 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) { } // all data scanned, the group by normal column can return - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {//todo refactor with merge interval time result pSupporter->subgroupIdx = 0; pQuery->pointsRead = 0; copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); + clearFirstNTimeWindow(pRuntimeEnv, pSupporter->subgroupIdx); } pQInfo->pointsRead += pQuery->pointsRead;