diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index d835b37c2497c241d52a243d34ab4ab63e76c12a..ffec03b65adc38db15d3e57bb11dccb8b0f93a92 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -898,7 +898,9 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { SSDataBlock* pBlock = NULL; while(1) { bool prev = *newgroup; + publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); pBlock = upstream->exec(upstream, newgroup); + publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { *newgroup = prev; break; @@ -966,7 +968,9 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { SSDataBlock* pBlock = NULL; if (pInfo->currentGroupOffset == 0) { + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; @@ -974,7 +978,9 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) { while ((*newgroup) == false) { // ignore the remain blocks + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; @@ -986,7 +992,10 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { return pBlock; } + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + if (pBlock == NULL) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; @@ -1000,7 +1009,10 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { } while ((*newgroup) == false) { + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + if (pBlock == NULL) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 74dbe42eeb5f784b6be65437b7df390fc1857ea6..029515050e3d0a884697c12fd73f0eafd40ec87d 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -825,7 +825,10 @@ static void fetchNextBlockIfCompleted(SOperatorInfo* pOperator, bool* newgroup) SJoinStatus* pStatus = &pJoinInfo->status[i]; if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) { tscDebug("Retrieve nest query result, index:%d, total:%d", i, pOperator->numOfUpstream); + + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup); + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); pStatus->index = 0; if (pStatus->pBlock == NULL) { diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index bc589c55b3d0d22ad672208b666a0e77075287c6..8279c58b24796c734b39e97e9a8e953e0248332f 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -133,6 +133,28 @@ typedef struct STableQueryInfo { SResultRowInfo resInfo; } STableQueryInfo; +typedef enum { + QUERY_PROF_BEFORE_OPERATOR_EXEC = 0, + QUERY_PROF_AFTER_OPERATOR_EXEC, + QUERY_PROF_QUERY_ABORT +} EQueryProfEventType; + +typedef struct { + EQueryProfEventType eventType; + int64_t eventTime; + + union { + uint8_t operatorType; //for operator event + int32_t abortCode; //for query abort event + }; +} SQueryProfEvent; + +typedef struct { + uint8_t operatorType; + int64_t sumSelfTime; + int64_t sumRunTimes; +} SOperatorProfResult; + typedef struct SQueryCostInfo { uint64_t loadStatisTime; uint64_t loadFileBlockTime; @@ -154,6 +176,9 @@ typedef struct SQueryCostInfo { uint64_t tableInfoSize; uint64_t hashSize; uint64_t numOfTimeWindows; + + SArray* queryProfEvents; //SArray + SHashObj* operatorProfResults; //map } SQueryCostInfo; typedef struct { @@ -587,7 +612,12 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data); size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows); void setQueryKilled(SQInfo *pQInfo); + +void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType); +void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code); +void calculateOperatorProfResults(SQInfo* pQInfo); void queryCostStatis(SQInfo *pQInfo); + void freeQInfo(SQInfo *pQInfo); void freeQueryAttr(SQueryAttr *pQuery); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index d56aeebdbfa6bf3d9838419410a999c09ab65d8d..1cd50df46b89456a5c651e9047df5bc27f2e0ad7 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -3785,6 +3785,88 @@ int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutp return pOutput->info.rows; } +void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType) { + SQueryProfEvent event; + event.eventType = eventType; + event.eventTime = taosGetTimestampUs(); + event.operatorType = operatorInfo->operatorType; + + SQInfo* qInfo = operatorInfo->pRuntimeEnv->qinfo; + taosArrayPush(qInfo->summary.queryProfEvents, &event); +} + +void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code) { + SQueryProfEvent event; + event.eventType = QUERY_PROF_QUERY_ABORT; + event.eventTime = taosGetTimestampUs(); + event.abortCode = code; + + taosArrayPush(pQInfo->summary.queryProfEvents, &event); +} + +typedef struct { + uint8_t operatorType; + int64_t beginTime; + int64_t endTime; + int64_t selfTime; + int64_t descendantsTime; +} SOperatorStackItem; + +static void doOperatorExecProfOnce(SOperatorStackItem* item, SQueryProfEvent* event, SArray* opStack, SHashObj* profResults) { + item->endTime = event->eventTime; + item->selfTime = (item->endTime - item->beginTime) - (item->descendantsTime); + + for (int32_t j = 0; j < taosArrayGetSize(opStack); ++j) { + SOperatorStackItem* ancestor = taosArrayGet(opStack, j); + ancestor->descendantsTime += item->selfTime; + } + + uint8_t operatorType = item->operatorType; + SOperatorProfResult* result = taosHashGet(profResults, &operatorType, sizeof(operatorType)); + if (result != NULL) { + result->sumRunTimes++; + result->sumSelfTime += item->selfTime; + } else { + SOperatorProfResult opResult; + opResult.operatorType = operatorType; + opResult.sumSelfTime = item->selfTime; + opResult.sumRunTimes = 1; + taosHashPut(profResults, &(operatorType), sizeof(operatorType), + &opResult, sizeof(opResult)); + } +} + +void calculateOperatorProfResults(SQInfo* pQInfo) { + SArray* opStack = taosArrayInit(32, sizeof(SOperatorStackItem)); + if (opStack == NULL) { + return; + } + size_t size = taosArrayGetSize(pQInfo->summary.queryProfEvents); + SHashObj* profResults = pQInfo->summary.operatorProfResults; + + for (int i = 0; i < size; ++i) { + SQueryProfEvent* event = taosArrayGet(pQInfo->summary.queryProfEvents, i); + if (event->eventType == QUERY_PROF_BEFORE_OPERATOR_EXEC) { + SOperatorStackItem opItem; + opItem.operatorType = event->operatorType; + opItem.beginTime = event->eventTime; + opItem.descendantsTime = 0; + taosArrayPush(opStack, &opItem); + } else if (event->eventType == QUERY_PROF_AFTER_OPERATOR_EXEC) { + SOperatorStackItem* item = taosArrayPop(opStack); + assert(item->operatorType == event->operatorType); + doOperatorExecProfOnce(item, event, opStack, profResults); + } else if (event->eventType == QUERY_PROF_QUERY_ABORT) { + SOperatorStackItem* item; + while ((item = taosArrayPop(opStack)) != NULL) { + doOperatorExecProfOnce(item, event, opStack, profResults); + } + } + } + + taosArrayDestroy(opStack); +} + void queryCostStatis(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryCostInfo *pSummary = &pQInfo->summary; @@ -3805,6 +3887,8 @@ void queryCostStatis(SQInfo *pQInfo) { pSummary->numOfTimeWindows = 0; } + calculateOperatorProfResults(pQInfo); + qDebug("QInfo:0x%"PRIx64" :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, " "load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64, pQInfo->qId, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis, @@ -3812,6 +3896,13 @@ void queryCostStatis(SQInfo *pQInfo) { qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0, pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0); + + SOperatorProfResult* opRes = taosHashIterate(pSummary->operatorProfResults, NULL); + while (opRes != NULL) { + qDebug("QInfo:0x%"PRIx64" :cost summary: operator : %d, exec times: %"PRId64", self time: %"PRId64, pQInfo->qId, + opRes->operatorType, opRes->sumRunTimes, opRes->sumSelfTime ); + opRes = taosHashIterate(pSummary->operatorProfResults, opRes); + } } //static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) { @@ -4213,6 +4304,9 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr // create runtime environment int32_t numOfTables = (int32_t)pQueryAttr->tableGroupInfo.numOfTables; pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo)); + pQInfo->summary.queryProfEvents = taosArrayInit(512, sizeof(SQueryProfEvent)); + pQInfo->summary.operatorProfResults = + taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TINYINT), true, HASH_NO_LOCK); code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQueryAttr->tableGroupInfo.numOfTables, pOperator, param); if (code != TSDB_CODE_SUCCESS) { @@ -4837,7 +4931,10 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { SOperatorInfo* upstream = pOperator->upstream[0]; while(1) { + publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = upstream->exec(upstream, newgroup); + publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + if (pBlock == NULL) { break; } @@ -4892,7 +4989,10 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) { SOperatorInfo* upstream = pOperator->upstream[0]; while(1) { + publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = upstream->exec(upstream, newgroup); + publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + if (pBlock == NULL) { break; } @@ -4972,7 +5072,10 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { bool prevVal = *newgroup; // The upstream exec may change the value of the newgroup, so use a local variable instead. + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + if (pBlock == NULL) { assert(*newgroup == false); @@ -5032,7 +5135,10 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { SSDataBlock* pBlock = NULL; while (1) { + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + if (pBlock == NULL) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; @@ -5082,7 +5188,10 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) { SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; while (1) { + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + if (pBlock == NULL) { break; } @@ -5127,7 +5236,10 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { SOperatorInfo* upstream = pOperator->upstream[0]; while(1) { + publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = upstream->exec(upstream, newgroup); + publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + if (pBlock == NULL) { break; } @@ -5180,7 +5292,10 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { SOperatorInfo* upstream = pOperator->upstream[0]; while(1) { + publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = upstream->exec(upstream, newgroup); + publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + if (pBlock == NULL) { break; } @@ -5308,7 +5423,10 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { STimeWindow win = pQueryAttr->window; SOperatorInfo* upstream = pOperator->upstream[0]; while (1) { + publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = upstream->exec(upstream, newgroup); + publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + if (pBlock == NULL) { break; } @@ -5366,7 +5484,9 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { SOperatorInfo* upstream = pOperator->upstream[0]; while(1) { + publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = upstream->exec(upstream, newgroup); + publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; } @@ -5417,7 +5537,9 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { SOperatorInfo* upstream = pOperator->upstream[0]; while(1) { + publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = upstream->exec(upstream, newgroup); + publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; } @@ -5483,7 +5605,10 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { } while(1) { + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + if (*newgroup) { assert(pBlock != NULL); } @@ -6153,7 +6278,10 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { pRes->info.rows = 0; SSDataBlock* pBlock = NULL; while(1) { + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + if (pBlock == NULL) { setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); pOperator->status = OP_EXEC_DONE; @@ -7479,6 +7607,9 @@ void freeQInfo(SQInfo *pQInfo) { tfree(pQInfo->pBuf); tfree(pQInfo->sql); + taosArrayDestroy(pQInfo->summary.queryProfEvents); + taosHashCleanup(pQInfo->summary.operatorProfResults); + taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows); pQInfo->signature = 0; diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 38ef81e7938a3635273c0cfa2cb4e86ca2e35c1e..787cb2f7d1a34f8958977eb85cd3c2621ff9a047 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -232,6 +232,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { // error occurs, record the error code and return to client int32_t ret = setjmp(pQInfo->runtimeEnv.env); if (ret != TSDB_CODE_SUCCESS) { + publishQueryAbortEvent(pQInfo, ret); pQInfo->code = ret; qDebug("QInfo:0x%"PRIx64" query abort due to error/cancel occurs, code:%s", pQInfo->qId, tstrerror(pQInfo->code)); return doBuildResCheck(pQInfo); @@ -240,7 +241,9 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { qDebug("QInfo:0x%"PRIx64" query task is launched", pQInfo->qId); bool newgroup = false; + publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_BEFORE_OPERATOR_EXEC); pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup); + publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_AFTER_OPERATOR_EXEC); pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv); if (isQueryKilled(pQInfo)) {