提交 a4e38ed7 编写于 作者: H Haojun Liao

enh(query): return open/total cost for each operator.

上级 bbf3b384
...@@ -36,6 +36,8 @@ extern "C" { ...@@ -36,6 +36,8 @@ extern "C" {
#define EXPLAIN_SORT_FORMAT "Sort" #define EXPLAIN_SORT_FORMAT "Sort"
#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s" #define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s"
#define EXPLAIN_SESSION_FORMAT "Session" #define EXPLAIN_SESSION_FORMAT "Session"
#define EXPLAIN_STATE_WINDOW_FORMAT "StateWindow on Column %s"
#define EXPLAIN_PARITION_FORMAT "Partition on Column %s"
#define EXPLAIN_ORDER_FORMAT "Order: %s" #define EXPLAIN_ORDER_FORMAT "Order: %s"
#define EXPLAIN_FILTER_FORMAT "Filter: " #define EXPLAIN_FILTER_FORMAT "Filter: "
#define EXPLAIN_FILL_FORMAT "Fill: %s" #define EXPLAIN_FILL_FORMAT "Fill: %s"
......
...@@ -162,6 +162,16 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo ...@@ -162,6 +162,16 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
pPhysiChildren = pSessNode->window.node.pChildren; pPhysiChildren = pSessNode->window.node.pChildren;
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: {
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pNode;
pPhysiChildren = pStateNode->window.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_PARTITION: {
SPartitionPhysiNode* partitionPhysiNode = (SPartitionPhysiNode*) pNode;
pPhysiChildren = partitionPhysiNode->node.pChildren;
break;
}
default: default:
qError("not supported physical node type %d", pNode->type); qError("not supported physical node type %d", pNode->type);
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
...@@ -339,7 +349,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -339,7 +349,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pTagScanNode->pScanCols->length); EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pTagScanNode->pScanCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
...@@ -734,6 +743,85 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -734,6 +743,85 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
} }
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: {
SStateWinodwPhysiNode *pStateNode = (SStateWinodwPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_STATE_WINDOW_FORMAT, nodesGetNameFromColumnNode(((STargetNode*)pStateNode->pStateKey)->pExpr));
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pStateNode->window.pFuncs->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pStateNode->window.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pStateNode->window.node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pStateNode->window.node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pStateNode->window.node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pStateNode->window.node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_PARTITION: {
SPartitionPhysiNode *pPartNode = (SPartitionPhysiNode *)pNode;
SNode* p = nodesListGetNode(pPartNode->pPartitionKeys, 0);
EXPLAIN_ROW_NEW(level, EXPLAIN_PARITION_FORMAT, nodesGetNameFromColumnNode(p));
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
// EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pPartNode->length);
// EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pPartNode->node.pOutputDataBlockDesc->totalRowSize);
// if (pPartNode->pGroupKeys) {
// EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
// EXPLAIN_ROW_APPEND(EXPLAIN_GROUPS_FORMAT, pPartNode->pGroupKeys->length);
// }
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pPartNode->node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pPartNode->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pPartNode->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pPartNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
default: default:
qError("not supported physical node type %d", pNode->type); qError("not supported physical node type %d", pNode->type);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
......
...@@ -2719,8 +2719,9 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) { ...@@ -2719,8 +2719,9 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
SExchangeInfo* pExchangeInfo = pOperator->info; SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int64_t el = taosGetTimestampUs() - startTs; int64_t el = taosGetTimestampUs() - startTs;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
pLoadInfo->totalElapsed += el; pLoadInfo->totalElapsed += el;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
...@@ -2921,6 +2922,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -2921,6 +2922,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
pLoadInfo->totalSize); pLoadInfo->totalSize);
} }
pOperator->resultInfo.totalRows += pRes->info.rows;
return pExchangeInfo->pResult; return pExchangeInfo->pResult;
} }
} }
...@@ -2930,10 +2932,10 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -2930,10 +2932,10 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int64_t st = taosGetTimestampUs();
SExchangeInfo* pExchangeInfo = pOperator->info; SExchangeInfo* pExchangeInfo = pOperator->info;
if (pExchangeInfo->seqLoadData) { if (!pExchangeInfo->seqLoadData) {
// do nothing for sequentially load data
} else {
int32_t code = prepareConcurrentlyLoad(pOperator); int32_t code = prepareConcurrentlyLoad(pOperator);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -2941,6 +2943,7 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -2941,6 +2943,7 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
} }
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2968,15 +2971,6 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { ...@@ -2968,15 +2971,6 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
} else { } else {
return concurrentlyLoadRemoteData(pOperator); return concurrentlyLoadRemoteData(pOperator);
} }
#if 0
_error:
taosMemoryFreeClear(pMsg);
taosMemoryFreeClear(pMsgSendInfo);
terrno = pTaskInfo->code;
return NULL;
#endif
} }
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) { static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
...@@ -3005,12 +2999,8 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* p ...@@ -3005,12 +2999,8 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* p
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo)); SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
taosMemoryFreeClear(pInfo); goto _error;
taosMemoryFreeClear(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
} }
size_t numOfSources = LIST_LENGTH(pSources); size_t numOfSources = LIST_LENGTH(pSources);
...@@ -3035,18 +3025,17 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* p ...@@ -3035,18 +3025,17 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* p
tsem_init(&pInfo->ready, 0, 0); tsem_init(&pInfo->ready, 0, 0);
pOperator->name = "ExchangeOperator"; pOperator->name = "ExchangeOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->numOfExprs = pBlock->info.numOfCols; pOperator->numOfExprs = pBlock->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
destroyExchangeOperatorInfo, NULL, NULL, NULL); destroyExchangeOperatorInfo, NULL, NULL, NULL);
pInfo->pTransporter = pTransporter; pInfo->pTransporter = pTransporter;
return pOperator; return pOperator;
_error: _error:
......
...@@ -269,15 +269,20 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { ...@@ -269,15 +269,20 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
size_t rows = pRes->info.rows;
if (rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
pOperator->resultInfo.totalRows += rows;
return (pRes->info.rows == 0)? NULL:pRes; return (pRes->info.rows == 0)? NULL:pRes;
} }
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN; int32_t scanFlag = MAIN_SCAN;
int64_t st = taosGetTimestampUs();
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) { while (1) {
...@@ -317,6 +322,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { ...@@ -317,6 +322,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
while(1) { while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes, NULL); doFilter(pInfo->pCondition, pRes, NULL);
...@@ -545,7 +552,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { ...@@ -545,7 +552,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
// try next group data // try next group data
pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter); pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter);
if (pInfo->pGroupIter == NULL) { if (pInfo->pGroupIter == NULL) {
pOperator->status = OP_EXEC_DONE; doSetOperatorCompleted(pOperator);
return NULL; return NULL;
} }
...@@ -562,6 +569,8 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { ...@@ -562,6 +569,8 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
blockDataUpdateTsWindow(pInfo->binfo.pRes, 0); blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId; pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows;
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
...@@ -578,6 +587,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { ...@@ -578,6 +587,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
return buildPartitionResult(pOperator); return buildPartitionResult(pOperator);
} }
int64_t st = taosGetTimestampUs();
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) { while (1) {
...@@ -589,6 +599,8 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { ...@@ -589,6 +599,8 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
doHashPartition(pOperator, pBlock); doHashPartition(pOperator, pBlock);
} }
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
blockDataEnsureCapacity(pRes, 4096); blockDataEnsureCapacity(pRes, 4096);
return buildPartitionResult(pOperator); return buildPartitionResult(pOperator);
...@@ -632,13 +644,14 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -632,13 +644,14 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
} }
pOperator->name = "PartitionOperator"; pOperator->name = "PartitionOperator";
pOperator->blocking = true; pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pInfo->binfo.pRes = pResultBlock; pInfo->binfo.pRes = pResultBlock;
pOperator->numOfExprs = numOfCols; pOperator->numOfExprs = numOfCols;
pOperator->pExpr = pExprInfo; pOperator->pExpr = pExprInfo;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo,
NULL, NULL, NULL); NULL, NULL, NULL);
......
...@@ -1640,7 +1640,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -1640,7 +1640,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
count += 1; count += 1;
if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) { if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) {
pOperator->status = OP_EXEC_DONE; doSetOperatorCompleted(pOperator);
} }
} }
...@@ -1652,6 +1652,8 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { ...@@ -1652,6 +1652,8 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
} }
pRes->info.rows = count; pRes->info.rows = count;
pOperator->resultInfo.totalRows += count;
return (pRes->info.rows == 0) ? NULL : pInfo->pRes; return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
} }
......
...@@ -943,6 +943,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -943,6 +943,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
} }
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
int64_t st = taosGetTimestampUs();
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) { while (1) {
...@@ -957,6 +958,8 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -957,6 +958,8 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
doStateWindowAggImpl(pOperator, pInfo, pBlock); doStateWindowAggImpl(pOperator, pInfo, pBlock);
} }
pOperator->cost.openCost = (taosGetTimestampUs() - st)/1000.0;
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo); closeAllResultRows(&pBInfo->resultRowInfo);
...@@ -967,7 +970,10 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -967,7 +970,10 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; size_t rows = pBInfo->pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
return (rows == 0)? NULL : pBInfo->pRes;
} }
static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
...@@ -1419,7 +1425,9 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -1419,7 +1425,9 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes; return pBInfo->pRes;
} }
int32_t order = TSDB_ORDER_ASC; int64_t st = taosGetTimestampUs();
int32_t order = TSDB_ORDER_ASC;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) { while (1) {
...@@ -1435,6 +1443,8 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -1435,6 +1443,8 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
doSessionWindowAggImpl(pOperator, pInfo, pBlock); doSessionWindowAggImpl(pOperator, pInfo, pBlock);
} }
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo); closeAllResultRows(&pBInfo->resultRowInfo);
...@@ -1446,7 +1456,10 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { ...@@ -1446,7 +1456,10 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; size_t rows = pBInfo->pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
return (rows == 0)? NULL : pBInfo->pRes;
} }
static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册