diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index 50e749c927edcc32d518632c540093fea7e3d4e7..71b1b0fc68895358b7f411d730cebde3f15f42a5 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -1225,9 +1225,9 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } else { // other queries // decide which group this rows belongs to according to current state value if (groupbyStateValue) { - char *stateVal = groupbyColumnData + bytes * offset; + char *val = groupbyColumnData + bytes * offset; - int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, stateVal, type, bytes); + int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, val, type, bytes); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code continue; } @@ -1268,10 +1268,10 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pDataBlockInfo, - SDataStatis *pStatis, __block_search_fn_t searchFn, - SWindowResInfo *pWindowResInfo, SArray *pDataBlock) { + SDataStatis *pStatis, __block_search_fn_t searchFn, SArray *pDataBlock) { SQuery *pQuery = pRuntimeEnv->pQuery; - + SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL || isGroupbyNormalCol(pQuery->pGroupbyExpr)) { rowwiseApplyFunctions(pRuntimeEnv, pStatis, pDataBlockInfo, pWindowResInfo, pDataBlock); } else { @@ -2446,6 +2446,7 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) { static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SQuery *pQuery = pRuntimeEnv->pQuery; + qTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d", GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order); @@ -2478,7 +2479,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } } - // in case of prj/diff query, ensure the output buffer is sufficient to accomodate the results of current block + // in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block if (!isIntervalQuery(pQuery) && !isGroupbyNormalCol(pQuery->pGroupbyExpr) && !isFixedOutputQuery(pQuery)) { SResultRec *pRec = &pQuery->rec; @@ -2507,8 +2508,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { SArray * pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis); pQuery->pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : blockInfo.rows - 1; - int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, - &pRuntimeEnv->windowResInfo, pDataBlock); + int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock); qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes); @@ -2519,7 +2519,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } } - // if the result buffer is not full, set the query completed flag + // if the result buffer is not full, set the query complete if (!Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)) { setQueryStatus(pQuery, QUERY_COMPLETED); } @@ -2530,7 +2530,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { closeAllTimeWindow(&pRuntimeEnv->windowResInfo); removeRedundantWindow(&pRuntimeEnv->windowResInfo, pQuery->lastKey - step, step); - pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; + pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window } else { assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)); } @@ -3436,7 +3436,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { STsdbQueryCond cond = { .twindow = qstatus.curWindow, - .order = pQuery->order.order, + .order = pQuery->order.order, .colList = pQuery->colList, .numOfCols = pQuery->numOfCols, }; @@ -3457,7 +3457,7 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } } - if (!needReverseScan(pQuery)) { + if (pRuntimeEnv->stableQuery || !needReverseScan(pQuery)) { return; } @@ -4060,8 +4060,7 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc pQuery->lastKey = keys[pQuery->pos]; pQuery->limit.offset = 0; - int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, - &pRuntimeEnv->windowResInfo, pDataBlock); + int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, pBlockInfo, NULL, binarySearchForKey, pDataBlock); qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes); @@ -4166,8 +4165,7 @@ static bool skipTimeInterval(SQueryRuntimeEnv *pRuntimeEnv) { pQuery->lastKey = ((TSKEY *)pColInfoData->pData)[startPos]; pWindowResInfo->prevSKey = tw.skey; - int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, NULL, binarySearchForKey, - &pRuntimeEnv->windowResInfo, pDataBlock); + int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, NULL, binarySearchForKey, pDataBlock); qTrace("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", rows:%d, res:%d", GET_QINFO_ADDR(pRuntimeEnv), blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, numOfRes); @@ -4454,6 +4452,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { .numOfCols = pQuery->numOfCols, }; + // todo refactor SArray *g1 = taosArrayInit(1, POINTER_BYTES); SArray *tx = taosArrayInit(1, sizeof(STableId)); @@ -4624,10 +4623,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) { SGroupItem *item = taosArrayGet(group, pQInfo->tableIndex); STableQueryInfo *pInfo = item->info; - if (pInfo->lastKey > 0) { - pQuery->window.skey = pInfo->lastKey; - } - if (!multiTableMultioutputHelper(pQInfo, pQInfo->tableIndex)) { pQInfo->tableIndex++; continue; @@ -4635,7 +4630,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { // SPointInterpoSupporter pointInterpSupporter = {0}; - // TODO handle the limit problem + // TODO handle the limit offset problem if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) { // skipBlocks(pRuntimeEnv);