未验证 提交 2dd4d905 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #12916 from taosdata/feature/3_liaohj

enh(query): return open/total cost for each operator.
......@@ -36,6 +36,8 @@ extern "C" {
#define EXPLAIN_SORT_FORMAT "Sort"
#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s"
#define EXPLAIN_SESSION_FORMAT "Session"
#define EXPLAIN_STATE_WINDOW_FORMAT "StateWindow on Column %s"
#define EXPLAIN_PARITION_FORMAT "Partition on Column %s"
#define EXPLAIN_ORDER_FORMAT "Order: %s"
#define EXPLAIN_FILTER_FORMAT "Filter: "
#define EXPLAIN_FILL_FORMAT "Fill: %s"
......
......@@ -162,6 +162,16 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
pPhysiChildren = pSessNode->window.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: {
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*) pNode;
pPhysiChildren = pStateNode->window.node.pChildren;
break;
}
case QUERY_NODE_PHYSICAL_PLAN_PARTITION: {
SPartitionPhysiNode* partitionPhysiNode = (SPartitionPhysiNode*) pNode;
pPhysiChildren = partitionPhysiNode->node.pChildren;
break;
}
default:
qError("not supported physical node type %d", pNode->type);
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
......@@ -339,7 +349,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pTagScanNode->pScanCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
......@@ -734,6 +743,85 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW: {
SStateWinodwPhysiNode *pStateNode = (SStateWinodwPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_STATE_WINDOW_FORMAT, nodesGetNameFromColumnNode(((STargetNode*)pStateNode->pStateKey)->pExpr));
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pStateNode->window.pFuncs->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pStateNode->window.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pStateNode->window.node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pStateNode->window.node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pStateNode->window.node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pStateNode->window.node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_PARTITION: {
SPartitionPhysiNode *pPartNode = (SPartitionPhysiNode *)pNode;
SNode* p = nodesListGetNode(pPartNode->pPartitionKeys, 0);
EXPLAIN_ROW_NEW(level, EXPLAIN_PARITION_FORMAT, nodesGetNameFromColumnNode(p));
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
// EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pPartNode->length);
// EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pPartNode->node.pOutputDataBlockDesc->totalRowSize);
// if (pPartNode->pGroupKeys) {
// EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
// EXPLAIN_ROW_APPEND(EXPLAIN_GROUPS_FORMAT, pPartNode->pGroupKeys->length);
// }
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pPartNode->node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pPartNode->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pPartNode->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pPartNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
default:
qError("not supported physical node type %d", pNode->type);
return TSDB_CODE_QRY_APP_ERROR;
......
......@@ -2719,8 +2719,9 @@ static void* setAllSourcesCompleted(SOperatorInfo* pOperator, int64_t startTs) {
SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int64_t el = taosGetTimestampUs() - startTs;
int64_t el = taosGetTimestampUs() - startTs;
SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo;
pLoadInfo->totalElapsed += el;
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
......@@ -2921,6 +2922,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
pLoadInfo->totalSize);
}
pOperator->resultInfo.totalRows += pRes->info.rows;
return pExchangeInfo->pResult;
}
}
......@@ -2930,10 +2932,10 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS;
}
int64_t st = taosGetTimestampUs();
SExchangeInfo* pExchangeInfo = pOperator->info;
if (pExchangeInfo->seqLoadData) {
// do nothing for sequentially load data
} else {
if (!pExchangeInfo->seqLoadData) {
int32_t code = prepareConcurrentlyLoad(pOperator);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -2941,6 +2943,7 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
}
OPTR_SET_OPENED(pOperator);
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
return TSDB_CODE_SUCCESS;
}
......@@ -2968,15 +2971,6 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
} else {
return concurrentlyLoadRemoteData(pOperator);
}
#if 0
_error:
taosMemoryFreeClear(pMsg);
taosMemoryFreeClear(pMsgSendInfo);
terrno = pTaskInfo->code;
return NULL;
#endif
}
static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
......@@ -3005,12 +2999,8 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* p
SExecTaskInfo* pTaskInfo) {
SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
goto _error;
}
size_t numOfSources = LIST_LENGTH(pSources);
......@@ -3035,18 +3025,17 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* p
tsem_init(&pInfo->ready, 0, 0);
pOperator->name = "ExchangeOperator";
pOperator->name = "ExchangeOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pBlock->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pBlock->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
destroyExchangeOperatorInfo, NULL, NULL, NULL);
pInfo->pTransporter = pTransporter;
return pOperator;
_error:
......
......@@ -269,15 +269,20 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
if (pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
size_t rows = pRes->info.rows;
if (rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
pOperator->resultInfo.totalRows += rows;
return (pRes->info.rows == 0)? NULL:pRes;
}
int32_t order = TSDB_ORDER_ASC;
int32_t scanFlag = MAIN_SCAN;
int64_t st = taosGetTimestampUs();
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
......@@ -317,6 +322,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity);
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
while(1) {
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
doFilter(pInfo->pCondition, pRes, NULL);
......@@ -545,7 +552,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
// try next group data
pInfo->pGroupIter = taosHashIterate(pInfo->pGroupSet, pInfo->pGroupIter);
if (pInfo->pGroupIter == NULL) {
pOperator->status = OP_EXEC_DONE;
doSetOperatorCompleted(pOperator);
return NULL;
}
......@@ -562,6 +569,8 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows;
return pInfo->binfo.pRes;
}
......@@ -578,6 +587,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
return buildPartitionResult(pOperator);
}
int64_t st = taosGetTimestampUs();
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
......@@ -589,6 +599,8 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
doHashPartition(pOperator, pBlock);
}
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
pOperator->status = OP_RES_TO_RETURN;
blockDataEnsureCapacity(pRes, 4096);
return buildPartitionResult(pOperator);
......@@ -632,13 +644,14 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
}
pOperator->name = "PartitionOperator";
pOperator->blocking = true;
pOperator->blocking = true;
pOperator->status = OP_NOT_OPENED;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
pInfo->binfo.pRes = pResultBlock;
pOperator->numOfExprs = numOfCols;
pOperator->numOfExprs = numOfCols;
pOperator->pExpr = pExprInfo;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo,
NULL, NULL, NULL);
......
......@@ -1640,7 +1640,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
count += 1;
if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) {
pOperator->status = OP_EXEC_DONE;
doSetOperatorCompleted(pOperator);
}
}
......@@ -1652,6 +1652,8 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
}
pRes->info.rows = count;
pOperator->resultInfo.totalRows += count;
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
......
......@@ -943,6 +943,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
}
int32_t order = TSDB_ORDER_ASC;
int64_t st = taosGetTimestampUs();
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
......@@ -957,6 +958,8 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
doStateWindowAggImpl(pOperator, pInfo, pBlock);
}
pOperator->cost.openCost = (taosGetTimestampUs() - st)/1000.0;
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
......@@ -967,7 +970,10 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
doSetOperatorCompleted(pOperator);
}
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
size_t rows = pBInfo->pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
return (rows == 0)? NULL : pBInfo->pRes;
}
static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
......@@ -1419,7 +1425,9 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes;
}
int32_t order = TSDB_ORDER_ASC;
int64_t st = taosGetTimestampUs();
int32_t order = TSDB_ORDER_ASC;
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
......@@ -1435,6 +1443,8 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
doSessionWindowAggImpl(pOperator, pInfo, pBlock);
}
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
// restore the value
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pBInfo->resultRowInfo);
......@@ -1446,7 +1456,10 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
doSetOperatorCompleted(pOperator);
}
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
size_t rows = pBInfo->pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
return (rows == 0)? NULL : pBInfo->pRes;
}
static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册