提交 cad32b12 编写于 作者: G Ganlin Zhao

add inherit from upstream order option for exchange

上级 5ad0d18c
......@@ -732,7 +732,7 @@ void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int64_t numOfRows, int3
STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInterval* pInterval, int32_t order);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder);
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
extern void doDestroyExchangeOperatorInfo(void* param);
......
......@@ -1449,7 +1449,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
return TSDB_CODE_SUCCESS;
}
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) {
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder) {
// todo add more information about exchange operation
int32_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN ||
......@@ -1459,7 +1459,9 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
*scanFlag = MAIN_SCAN;
return TSDB_CODE_SUCCESS;
} else if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
// for exchange operator inherit order from upstream and do not overwrite here
if (!inheritUsOrder) {
*order = TSDB_ORDER_ASC;
}
*scanFlag = MAIN_SCAN;
return TSDB_CODE_SUCCESS;
} else if (type == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
......@@ -1476,7 +1478,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
if (pOperator->pDownstream == NULL || pOperator->pDownstream[0] == NULL) {
return TSDB_CODE_INVALID_PARA;
} else {
return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag);
return getTableScanInfo(pOperator->pDownstream[0], order, scanFlag, inheritUsOrder);
}
}
}
......@@ -1592,7 +1594,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
}
hasValidBlock = true;
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag, false);
if (code != TSDB_CODE_SUCCESS) {
destroyDataBlockForEmptyInput(blockAllocated, &pBlock);
T_LONG_JMP(pTaskInfo->env, code);
......
......@@ -69,7 +69,7 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp
int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN;
getTableScanInfo(pOperator, &order, &scanFlag);
getTableScanInfo(pOperator, &order, &scanFlag, false);
int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey;
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
......@@ -128,7 +128,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN;
getTableScanInfo(pOperator, &order, &scanFlag);
getTableScanInfo(pOperator, &order, &scanFlag, false);
doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
if (pResBlock->info.rows > 0) {
......
......@@ -383,7 +383,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
break;
}
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag, false);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......
......@@ -289,7 +289,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
// the pDataBlock are always the same one, no need to call this again
int32_t code = getTableScanInfo(downstream, &order, &scanFlag);
int32_t code = getTableScanInfo(downstream, &order, &scanFlag, false);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -441,7 +441,7 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp
SExprSupp* pSup = &pOperator->exprSupp;
// the pDataBlock are always the same one, no need to call this again
int32_t code = getTableScanInfo(downstream, &order, &scanFlag);
int32_t code = getTableScanInfo(downstream, &order, &scanFlag, false);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......
......@@ -1072,7 +1072,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
break;
}
getTableScanInfo(pOperator, &pInfo->inputOrder, &scanFlag);
getTableScanInfo(pOperator, &pInfo->inputOrder, &scanFlag, true);
if (pInfo->scalarSupp.pExprInfo != NULL) {
SExprSupp* pExprSup = &pInfo->scalarSupp;
......@@ -4294,7 +4294,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
}
}
getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag);
getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag, false);
setInputDataBlock(pSup, pBlock, pIaInfo->inputOrder, scanFlag, true);
doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes);
......@@ -4621,7 +4621,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
break;
}
getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag);
getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag, false);
setInputDataBlock(pExpSupp, pBlock, iaInfo->inputOrder, scanFlag, true);
doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册