提交 1081d688 编写于 作者: G Ganlin Zhao

add inherit from upstream order option for exchange

上级 c8a41e34
...@@ -732,7 +732,7 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int3 ...@@ -732,7 +732,7 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int3
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order); STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
extern void doDestroyExchangeOperatorInfo(void* param); extern void doDestroyExchangeOperatorInfo(void* param);
......
...@@ -1449,7 +1449,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t ...@@ -1449,7 +1449,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) { int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) {
// todo add more information about exchange operation // todo add more information about exchange operation
int32_t type = pOperator->operatorType; int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN ||
...@@ -1459,7 +1459,9 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan ...@@ -1459,7 +1459,9 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
*scanFlag = MAIN_SCAN; *scanFlag = MAIN_SCAN;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) { } else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
// for exchange operator inherit order from upstream and do not overwrite here if (!inheritUsOrder) {
*order = TSDB_ORDER_ASC;
}
*scanFlag = MAIN_SCAN; *scanFlag = MAIN_SCAN;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { } else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
...@@ -1476,7 +1478,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan ...@@ -1476,7 +1478,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) { if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} else { } else {
return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag); return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag, inheritUsOrder);
} }
} }
} }
...@@ -1592,7 +1594,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { ...@@ -1592,7 +1594,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
} }
hasValidBlock = true; hasValidBlock = true;
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); int32_t code = getTableScanInfo(pOperator, &order, &scanFlag, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock); destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
......
...@@ -69,7 +69,7 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp ...@@ -69,7 +69,7 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN; int32_t scanFlag = MAIN_SCAN;
getTableScanInfo(pOperator, &order, &scanFlag); getTableScanInfo(pOperator, &order, &scanFlag, false);
int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey; int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey;
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo)); taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
...@@ -128,7 +128,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { ...@@ -128,7 +128,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN; int32_t scanFlag = MAIN_SCAN;
getTableScanInfo(pOperator, &order, &scanFlag); getTableScanInfo(pOperator, &order, &scanFlag, false);
doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo); doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows > 0) { if (pResBlock->info.rows > 0) {
......
...@@ -383,7 +383,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { ...@@ -383,7 +383,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
break; break;
} }
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); int32_t code = getTableScanInfo(pOperator, &order, &scanFlag, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
......
...@@ -289,7 +289,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { ...@@ -289,7 +289,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
int32_t code = getTableScanInfo(downstream, &order, &scanFlag); int32_t code = getTableScanInfo(downstream, &order, &scanFlag, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
...@@ -441,7 +441,7 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp ...@@ -441,7 +441,7 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
int32_t code = getTableScanInfo(downstream, &order, &scanFlag); int32_t code = getTableScanInfo(downstream, &order, &scanFlag, false);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
......
...@@ -1072,7 +1072,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1072,7 +1072,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} }
getTableScanInfo(pOperator, &pInfo->inputOrder, &scanFlag); getTableScanInfo(pOperator, &pInfo->inputOrder, &scanFlag, true);
if (pInfo->scalarSupp.pExprInfo != NULL) { if (pInfo->scalarSupp.pExprInfo != NULL) {
SExprSupp* pExprSup = &pInfo->scalarSupp; SExprSupp* pExprSup = &pInfo->scalarSupp;
...@@ -4294,7 +4294,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4294,7 +4294,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
} }
} }
getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag); getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag, false);
setInputDataBlock(pSup, pBlock, pIaInfo->inputOrder, scanFlag, true); setInputDataBlock(pSup, pBlock, pIaInfo->inputOrder, scanFlag, true);
doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes); doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
...@@ -4621,7 +4621,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4621,7 +4621,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} }
getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag); getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag, false);
setInputDataBlock(pExpSupp, pBlock, iaInfo->inputOrder, scanFlag, true); setInputDataBlock(pExpSupp, pBlock, iaInfo->inputOrder, scanFlag, true);
doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册