diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5df3b14a5b1bedd6a75971320fc4e0560eb70527..be79054c1b1b5aa40cd566cc1a50553660ec7062 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -732,7 +732,7 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int3 STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); -int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag); +int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); extern void doDestroyExchangeOperatorInfo(void* param); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 780e371339ab9daf0f28b8d6fb1b78b894f8be32..9c85ebae9dac6bcf9197ebe8b2e4c118fb1b5ae4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1449,16 +1449,21 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t return TSDB_CODE_SUCCESS; } -int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) { +int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) { // todo add more information about exchange operation int32_t type = pOperator->operatorType; - if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || - type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || - type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) { + if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || + type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN || + type == QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN) { *order = TSDB_ORDER_ASC; *scanFlag = MAIN_SCAN; return TSDB_CODE_SUCCESS; + } else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) { + if (!inheritUsOrder) { + *order = TSDB_ORDER_ASC; + } + *scanFlag = MAIN_SCAN; + return TSDB_CODE_SUCCESS; } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = pOperator->info; *order = pTableScanInfo->base.cond.order; @@ -1473,7 +1478,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { return TSDB_CODE_INVALID_PARA; } else { - return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag); + return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag, inheritUsOrder); } } } @@ -1589,7 +1594,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } hasValidBlock = true; - int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); + int32_t code = getTableScanInfo(pOperator, &order, &scanFlag, false); if (code != TSDB_CODE_SUCCESS) { destroyDataBlockForEmptyInput(blockAllocated, &pBlock); T_LONG_JMP(pTaskInfo->env, code); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index adb045a69fa874e6f7beddd63eadb0199ddb9bfe..2a33e3527ac02b82cb12832b8f484d321f1b220e 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -69,7 +69,7 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp int32_t order = TSDB_ORDER_ASC; int32_t scanFlag = MAIN_SCAN; - getTableScanInfo(pOperator, &order, &scanFlag); + getTableScanInfo(pOperator, &order, &scanFlag, false); int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey; taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo)); @@ -128,7 +128,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { int32_t order = TSDB_ORDER_ASC; int32_t scanFlag = MAIN_SCAN; - getTableScanInfo(pOperator, &order, &scanFlag); + getTableScanInfo(pOperator, &order, &scanFlag, false); doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo); if (pResBlock->info.rows > 0) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 5ef82f0e0844519f695324818bc70f1f8b42b711..0fc2cdb46a6d6c4d4ac8dba7054778b747cf7512 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -383,7 +383,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { break; } - int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); + int32_t code = getTableScanInfo(pOperator, &order, &scanFlag, false); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 3ae114c6563f8af52075e243d00a5a2f75ba6167..49bc5af634b82c410e903d023d8c1e5e62b9d3fd 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -289,7 +289,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again - int32_t code = getTableScanInfo(downstream, &order, &scanFlag); + int32_t code = getTableScanInfo(downstream, &order, &scanFlag, false); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -441,7 +441,7 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp SExprSupp* pSup = &pOperator->exprSupp; // the pDataBlock are always the same one, no need to call this again - int32_t code = getTableScanInfo(downstream, &order, &scanFlag); + int32_t code = getTableScanInfo(downstream, &order, &scanFlag, false); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 20d4f46eaf5e2f87dfe91515aa7885e434cb74ad..6411d862ae20ba89b3de24a7c400d934a7a69180 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1072,7 +1072,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { break; } - getTableScanInfo(pOperator, &pInfo->inputOrder, &scanFlag); + getTableScanInfo(pOperator, &pInfo->inputOrder, &scanFlag, true); if (pInfo->scalarSupp.pExprInfo != NULL) { SExprSupp* pExprSup = &pInfo->scalarSupp; @@ -4294,7 +4294,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { } } - getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag); + getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag, false); setInputDataBlock(pSup, pBlock, pIaInfo->inputOrder, scanFlag, true); doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes); @@ -4621,7 +4621,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { break; } - getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag); + getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag, false); setInputDataBlock(pExpSupp, pBlock, iaInfo->inputOrder, scanFlag, true); doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 7b6278c916aa79329576242de087c7a50a0c6e8f..9938bba1b26556157bc56d7877b82d1de9bdaf7b 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -945,7 +945,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/function_null.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count_partition.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_partition.py -Q 3 -#,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/max_min_last_interval.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/last_row.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tsbsQuery.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/sml.py -Q 3