From 60e7e2ae6fabd3c8a47368ac8343ab96b5d4a936 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 24 May 2022 11:29:51 +0800 Subject: [PATCH] enh(query): add more information regarding analyze sql execution. --- include/common/tmsg.h | 17 +- include/libs/command/command.h | 2 +- source/common/src/tmsg.c | 12 +- source/libs/command/inc/commandInt.h | 2 +- source/libs/command/src/explain.c | 33 +++- source/libs/executor/inc/executorimpl.h | 61 ++----- source/libs/executor/src/executorMain.c | 13 -- source/libs/executor/src/executorimpl.c | 162 ++++-------------- source/libs/executor/src/groupoperator.c | 25 +-- source/libs/executor/src/joinoperator.c | 4 - source/libs/executor/src/scanoperator.c | 44 +++-- source/libs/executor/src/timewindowoperator.c | 24 +-- 12 files changed, 160 insertions(+), 239 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9104e8a423..aaf7f04378 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1210,9 +1210,10 @@ typedef struct { } SRetrieveMetaTableRsp; typedef struct SExplainExecInfo { - uint64_t startupCost; - uint64_t totalCost; + double startupCost; + double totalCost; uint64_t numOfRows; + uint32_t verboseLen; void* verboseInfo; } SExplainExecInfo; @@ -1221,6 +1222,18 @@ typedef struct { SExplainExecInfo* subplanInfo; } SExplainRsp; +typedef struct STableScanAnalyzeInfo { + uint64_t totalRows; + uint64_t totalCheckedRows; + uint32_t totalBlocks; + uint32_t loadBlocks; + uint32_t loadBlockStatis; + uint32_t skipBlocks; + uint32_t filterOutBlocks; + double elapsedTime; + uint64_t filterTime; +} STableScanAnalyzeInfo; + int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp); int32_t tDeserializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp); diff --git a/include/libs/command/command.h b/include/libs/command/command.h index 0cd566ee46..aee6b83783 100644 --- a/include/libs/command/command.h +++ b/include/libs/command/command.h @@ -24,7 +24,7 @@ int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp); int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp); int32_t qExecExplainBegin(SQueryPlan *pDag, SExplainCtx **pCtx, int64_t startTs); int32_t qExecExplainEnd(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp); -int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp); +int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp); void qExplainFreeCtx(SExplainCtx *pCtx); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index cc333ae5c8..12c9e8aa4d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -3318,9 +3318,11 @@ int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { if (tEncodeI32(&encoder, pRsp->numOfPlans) < 0) return -1; for (int32_t i = 0; i < pRsp->numOfPlans; ++i) { SExplainExecInfo *info = &pRsp->subplanInfo[i]; - if (tEncodeU64(&encoder, info->startupCost) < 0) return -1; - if (tEncodeU64(&encoder, info->totalCost) < 0) return -1; + if (tEncodeDouble(&encoder, info->startupCost) < 0) return -1; + if (tEncodeDouble(&encoder, info->totalCost) < 0) return -1; if (tEncodeU64(&encoder, info->numOfRows) < 0) return -1; + if (tEncodeU32(&encoder, info->verboseLen) < 0) return -1; + if (tEncodeBinary(&encoder, info->verboseInfo, info->verboseLen) < 0) return -1; } tEndEncode(&encoder); @@ -3341,9 +3343,11 @@ int32_t tDeserializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { if (pRsp->subplanInfo == NULL) return -1; } for (int32_t i = 0; i < pRsp->numOfPlans; ++i) { - if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].startupCost) < 0) return -1; - if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].totalCost) < 0) return -1; + if (tDecodeDouble(&decoder, &pRsp->subplanInfo[i].startupCost) < 0) return -1; + if (tDecodeDouble(&decoder, &pRsp->subplanInfo[i].totalCost) < 0) return -1; if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].numOfRows) < 0) return -1; + if (tDecodeU32(&decoder, &pRsp->subplanInfo[i].verboseLen) < 0) return -1; + if (tDecodeBinary(&decoder, (const uint8_t**) &pRsp->subplanInfo[i].verboseInfo, &pRsp->subplanInfo[i].verboseLen) < 0) return -1; } tEndDecode(&decoder); diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h index 775dee28a4..16d7ec0c4a 100644 --- a/source/libs/command/inc/commandInt.h +++ b/source/libs/command/inc/commandInt.h @@ -60,7 +60,7 @@ extern "C" { #define EXPLAIN_GROUPS_FORMAT "groups=%d" #define EXPLAIN_WIDTH_FORMAT "width=%d" #define EXPLAIN_FUNCTIONS_FORMAT "functions=%d" -#define EXPLAIN_EXECINFO_FORMAT "cost=%" PRIu64 "..%" PRIu64 " rows=%" PRIu64 +#define EXPLAIN_EXECINFO_FORMAT "cost=%.3f..%.3f rows=%" PRIu64 typedef struct SExplainGroup { int32_t nodeNum; diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 2e94ec8d0c..03a4e67db4 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -381,6 +381,35 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); + // basic analyze output + if (EXPLAIN_MODE_ANALYZE == ctx->mode) { + EXPLAIN_ROW_NEW(level + 1, "I/O: "); + + int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo); + for (int32_t i = 0; i < nodeNum; ++i) { + SExplainExecInfo * execInfo = taosArrayGet(pResNode->pExecInfo, i); + STableScanAnalyzeInfo *pScanInfo = (STableScanAnalyzeInfo *)execInfo->verboseInfo; + + EXPLAIN_ROW_APPEND("total_blocks=%d", pScanInfo->totalBlocks); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + + EXPLAIN_ROW_APPEND("load_blocks=%d", pScanInfo->loadBlocks); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + + EXPLAIN_ROW_APPEND("load_block_SMAs=%d", pScanInfo->loadBlockStatis); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + + EXPLAIN_ROW_APPEND("total_rows=%" PRIu64, pScanInfo->totalRows); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + + EXPLAIN_ROW_APPEND("check_rows=%" PRIu64, pScanInfo->totalCheckedRows); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + } + + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } + if (verbose) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, @@ -390,8 +419,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pTblScanNode->scanRange.skey, - pTblScanNode->scanRange.ekey); + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pTblScanNode->scanRange.skey, pTblScanNode->scanRange.ekey); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); @@ -637,6 +665,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pIntNode->window.pFuncs->length); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIntNode->window.node.pOutputDataBlockDesc->totalRowSize); + EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0dacbba8e5..8ac320b9aa 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -86,43 +86,12 @@ typedef struct STableQueryInfo { // SVariant tag; } STableQueryInfo; -typedef enum { - QUERY_PROF_BEFORE_OPERATOR_EXEC = 0, - QUERY_PROF_AFTER_OPERATOR_EXEC, - QUERY_PROF_QUERY_ABORT -} EQueryProfEventType; - -typedef struct { - EQueryProfEventType eventType; - int64_t eventTime; - - union { - uint8_t operatorType; // for operator event - int32_t abortCode; // for query abort event - }; -} SQueryProfEvent; - -typedef struct { - uint8_t operatorType; - int64_t sumSelfTime; - int64_t sumRunTimes; -} SOperatorProfResult; - typedef struct SLimit { int64_t limit; int64_t offset; } SLimit; -typedef struct SFileBlockLoadRecorder { - uint64_t totalRows; - uint64_t totalCheckedRows; - uint32_t totalBlocks; - uint32_t loadBlocks; - uint32_t loadBlockStatis; - uint32_t skipBlocks; - uint32_t filterOutBlocks; - uint64_t elapsedTime; -} SFileBlockLoadRecorder; +typedef struct STableScanAnalyzeInfo SFileBlockLoadRecorder; typedef struct STaskCostInfo { int64_t created; @@ -152,8 +121,8 @@ typedef struct STaskCostInfo { } STaskCostInfo; typedef struct SOperatorCostInfo { - uint64_t openCost; - uint64_t totalCost; + double openCost; + double totalCost; } SOperatorCostInfo; // The basic query information extracted from the SQueryInfo tree to support the @@ -200,7 +169,7 @@ typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggS typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr); typedef void (*__optr_close_fn_t)(void* param, int32_t num); -typedef int32_t (*__optr_get_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain); +typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); typedef struct STaskIdInfo { uint64_t queryId; // this is also a request id @@ -264,14 +233,14 @@ enum { }; typedef struct SOperatorFpSet { - __optr_open_fn_t _openFn; // DO NOT invoke this function directly - __optr_fn_t getNextFn; - __optr_fn_t getStreamResFn; // execute the aggregate in the stream model, todo remove it - __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP - __optr_close_fn_t closeFn; - __optr_encode_fn_t encodeResultRow; - __optr_decode_fn_t decodeResultRow; - __optr_get_explain_fn_t getExplainFn; + __optr_open_fn_t _openFn; // DO NOT invoke this function directly + __optr_fn_t getNextFn; + __optr_fn_t getStreamResFn; // execute the aggregate in the stream model, todo remove it + __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP + __optr_close_fn_t closeFn; + __optr_encode_fn_t encodeResultRow; + __optr_decode_fn_t decodeResultRow; + __optr_explain_fn_t getExplainFn; } SOperatorFpSet; typedef struct SOperatorInfo { @@ -656,7 +625,7 @@ typedef struct SJoinOperatorInfo { SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn, __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode, - __optr_decode_fn_t decode, __optr_get_explain_fn_t explain); + __optr_decode_fn_t decode, __optr_explain_fn_t explain); int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); void operatorDummyCloseFn(void* param, int32_t numOfCols); @@ -775,10 +744,6 @@ bool isTaskKilled(SExecTaskInfo* pTaskInfo); int32_t checkForQueryBuf(size_t numOfTables); void setTaskKilled(SExecTaskInfo* pTaskInfo); - -void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType); -void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code); - void queryCostStatis(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index d4d8696aba..6689def7a7 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -30,13 +30,6 @@ #include "tlosertree.h" #include "ttypes.h" -typedef struct STaskMgmt { - TdThreadMutex lock; - SCacheObj *qinfoPool; // query handle pool - int32_t vgId; - bool closed; -} STaskMgmt; - int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) { assert(readHandle != NULL && pSubplan != NULL); @@ -131,7 +124,6 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { // error occurs, record the error code and return to client int32_t ret = setjmp(pTaskInfo->env); if (ret != TSDB_CODE_SUCCESS) { - publishQueryAbortEvent(pTaskInfo, ret); pTaskInfo->code = ret; cleanUpUdfs(); qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), @@ -141,16 +133,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { qDebug("%s execTask is launched", GET_TASKID(pTaskInfo)); - publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC); - int64_t st = taosGetTimestampUs(); *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot); uint64_t el = (taosGetTimestampUs() - st); pTaskInfo->cost.elapsedTime += el; - - publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC); - if (NULL == *pRes) { *useconds = pTaskInfo->cost.elapsedTime; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 4475cb9e62..c7a76bb9fe 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -125,6 +125,8 @@ static void destroySysTableScannerOperatorInfo(void* param, int32_t numOfOutput) void doSetOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; + + pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start * 1000)/1000.0; if (pOperator->pTaskInfo != NULL) { setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); } @@ -138,7 +140,7 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) { SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn, __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode, - __optr_decode_fn_t decode, __optr_get_explain_fn_t explain) { + __optr_decode_fn_t decode, __optr_explain_fn_t explain) { SOperatorFpSet fpSet = { ._openFn = openFn, .getNextFn = nextFn, @@ -2136,102 +2138,6 @@ int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock return pBlock->info.rows; } -void publishOperatorProfEvent(SOperatorInfo* pOperator, EQueryProfEventType eventType) { - SQueryProfEvent event = {0}; - - event.eventType = eventType; - event.eventTime = taosGetTimestampUs(); - event.operatorType = pOperator->operatorType; - // if (pQInfo->summary.queryProfEvents) { - // taosArrayPush(pQInfo->summary.queryProfEvents, &event); - // } -} - -void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code) { - SQueryProfEvent event; - event.eventType = QUERY_PROF_QUERY_ABORT; - event.eventTime = taosGetTimestampUs(); - event.abortCode = code; - - if (pTaskInfo->cost.queryProfEvents) { - taosArrayPush(pTaskInfo->cost.queryProfEvents, &event); - } -} - -typedef struct { - uint8_t operatorType; - int64_t beginTime; - int64_t endTime; - int64_t selfTime; - int64_t descendantsTime; -} SOperatorStackItem; - -static void doOperatorExecProfOnce(SOperatorStackItem* item, SQueryProfEvent* event, SArray* opStack, - SHashObj* profResults) { - item->endTime = event->eventTime; - item->selfTime = (item->endTime - item->beginTime) - (item->descendantsTime); - - for (int32_t j = 0; j < taosArrayGetSize(opStack); ++j) { - SOperatorStackItem* ancestor = taosArrayGet(opStack, j); - ancestor->descendantsTime += item->selfTime; - } - - uint8_t operatorType = item->operatorType; - SOperatorProfResult* result = taosHashGet(profResults, &operatorType, sizeof(operatorType)); - if (result != NULL) { - result->sumRunTimes++; - result->sumSelfTime += item->selfTime; - } else { - SOperatorProfResult opResult; - opResult.operatorType = operatorType; - opResult.sumSelfTime = item->selfTime; - opResult.sumRunTimes = 1; - taosHashPut(profResults, &(operatorType), sizeof(operatorType), &opResult, sizeof(opResult)); - } -} - -void calculateOperatorProfResults(void) { - // if (pQInfo->summary.queryProfEvents == NULL) { - // // qDebug("QInfo:0x%"PRIx64" query prof events array is null", pQInfo->qId); - // return; - // } - // - // if (pQInfo->summary.operatorProfResults == NULL) { - // // qDebug("QInfo:0x%"PRIx64" operator prof results hash is null", pQInfo->qId); - // return; - // } - - SArray* opStack = taosArrayInit(32, sizeof(SOperatorStackItem)); - if (opStack == NULL) { - return; - } -#if 0 - size_t size = taosArrayGetSize(pQInfo->summary.queryProfEvents); - SHashObj* profResults = pQInfo->summary.operatorProfResults; - - for (int i = 0; i < size; ++i) { - SQueryProfEvent* event = taosArrayGet(pQInfo->summary.queryProfEvents, i); - if (event->eventType == QUERY_PROF_BEFORE_OPERATOR_EXEC) { - SOperatorStackItem opItem; - opItem.operatorType = event->operatorType; - opItem.beginTime = event->eventTime; - opItem.descendantsTime = 0; - taosArrayPush(opStack, &opItem); - } else if (event->eventType == QUERY_PROF_AFTER_OPERATOR_EXEC) { - SOperatorStackItem* item = taosArrayPop(opStack); - assert(item->operatorType == event->operatorType); - doOperatorExecProfOnce(item, event, opStack, profResults); - } else if (event->eventType == QUERY_PROF_QUERY_ABORT) { - SOperatorStackItem* item; - while ((item = taosArrayPop(opStack)) != NULL) { - doOperatorExecProfOnce(item, event, opStack, profResults); - } - } - } -#endif - taosArrayDestroy(opStack); -} - void queryCostStatis(SExecTaskInfo* pTaskInfo) { STaskCostInfo* pSummary = &pTaskInfo->cost; @@ -2264,15 +2170,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) { // qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, // hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0, // pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0); - - if (pSummary->operatorProfResults) { - SOperatorProfResult* opRes = taosHashIterate(pSummary->operatorProfResults, NULL); - while (opRes != NULL) { - // qDebug("QInfo:0x%" PRIx64 " :cost summary: operator : %d, exec times: %" PRId64 ", self time: %" PRId64, - // pQInfo->qId, opRes->operatorType, opRes->sumRunTimes, opRes->sumSelfTime); - opRes = taosHashIterate(pSummary->operatorProfResults, opRes); - } - } } // static void updateOffsetVal(STaskRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) { @@ -3523,14 +3420,13 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { SOptrBasicInfo* pInfo = &pAggInfo->binfo; SOperatorInfo* downstream = pOperator->pDownstream[0]; + int64_t st = taosGetTimestampUs(); + int32_t order = TSDB_ORDER_ASC; int32_t scanFlag = MAIN_SCAN; while (1) { - publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); - if (pBlock == NULL) { break; } @@ -3576,6 +3472,8 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { closeAllResultRows(&pAggInfo->binfo.resultRowInfo); initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); OPTR_SET_OPENED(pOperator); + + pOperator->cost.openCost = (taosGetTimestampUs() - st)/1000.0; return TSDB_CODE_SUCCESS; } @@ -3590,6 +3488,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + doSetOperatorCompleted(pOperator); return NULL; } @@ -3599,7 +3498,10 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { doSetOperatorCompleted(pOperator); } - return (blockDataGetNumOfRows(pInfo->pRes) != 0) ? pInfo->pRes : NULL; + size_t rows = blockDataGetNumOfRows(pInfo->pRes);//pInfo->pRes : NULL; + pOperator->resultInfo.totalRows += rows; + + return (rows == 0)? NULL:pInfo->pRes; } void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result, @@ -3825,22 +3727,25 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } #endif + int64_t st = 0; int32_t order = 0; int32_t scanFlag = 0; + if (pOperator->cost.openCost == 0) { + st = taosGetTimestampUs(); + } + SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { // The downstream exec may change the value of the newgroup, so use a local variable instead. - publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); - if (pBlock == NULL) { - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + doSetOperatorCompleted(pOperator); break; } +#if 0 // Return result of the previous group in the firstly. if (false) { if (pRes->info.rows > 0) { @@ -3850,6 +3755,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { initCtxOutputBuffer(pInfo->pCtx, pOperator->numOfExprs); } } +#endif // the pDataBlock are always the same one, no need to call this again int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag); @@ -3875,8 +3781,14 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { pProjectInfo->curOutput += pInfo->pRes->info.rows; - // copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfExprs); - return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL; + size_t rows = pInfo->pRes->info.rows; + pOperator->resultInfo.totalRows += rows; + + if (pOperator->cost.openCost == 0) { + pOperator->cost.openCost = (taosGetTimestampUs() - st)/ 1000.0; + } + + return (rows > 0)? pInfo->pRes:NULL; } static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup, @@ -3933,10 +3845,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { SOperatorInfo* pDownstream = pOperator->pDownstream[0]; while (1) { - publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream); - publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC); - if (*newgroup) { assert(pBlock != NULL); } @@ -5213,16 +5122,21 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo } } - (*pRes)[*resNum].numOfRows = operatorInfo->resultInfo.totalRows; - (*pRes)[*resNum].startupCost = operatorInfo->cost.openCost; - (*pRes)[*resNum].totalCost = operatorInfo->cost.totalCost; + SExplainExecInfo* pInfo = &(*pRes)[*resNum]; + + pInfo->numOfRows = operatorInfo->resultInfo.totalRows; + pInfo->startupCost = operatorInfo->cost.openCost; + pInfo->totalCost = operatorInfo->cost.totalCost; if (operatorInfo->fpSet.getExplainFn) { - int32_t code = (*operatorInfo->fpSet.getExplainFn)(operatorInfo, &(*pRes)->verboseInfo); + int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pInfo->verboseInfo, &pInfo->verboseLen); if (code) { - qError("operator getExplainFn failed, error:%s", tstrerror(code)); + qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code)); return code; } + } else { + pInfo->verboseLen = 0; + pInfo->verboseInfo = NULL; } ++(*resNum); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 7606374cdb..59786310d7 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -270,24 +270,29 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { if (pOperator->status == OP_RES_TO_RETURN) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { - pOperator->status = OP_EXEC_DONE; + doSetOperatorCompleted(pOperator); } return (pRes->info.rows == 0)? NULL:pRes; } - int32_t order = TSDB_ORDER_ASC; + int32_t order = TSDB_ORDER_ASC; + int32_t scanFlag = MAIN_SCAN; + SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { - publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; } + int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, code); + } + // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, MAIN_SCAN, true); + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, scanFlag, true); // there is an scalar expression that needs to be calculated right before apply the group aggregation. if (pInfo->pScalarExprInfo != NULL) { @@ -297,7 +302,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { } } - // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs); doHashGroupbyAgg(pOperator, pBlock); } @@ -319,7 +323,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { bool hasRemain = hashRemainDataInGroupInfo(&pInfo->groupResInfo); if (!hasRemain) { - pOperator->status = OP_EXEC_DONE; + doSetOperatorCompleted(pOperator); break; } @@ -328,7 +332,10 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { } } - return (pRes->info.rows == 0)? NULL:pRes; + size_t rows = pRes->info.rows; + pOperator->resultInfo.totalRows += rows; + + return (rows == 0)? NULL:pRes; } SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, @@ -574,9 +581,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { - publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; } diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index d7d6d96346..ad9e4d63f0 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -98,9 +98,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { // todo extract method if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { SOperatorInfo* ds1 = pOperator->pDownstream[0]; - publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC); pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1); - publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC); pJoinInfo->leftPos = 0; if (pJoinInfo->pLeft == NULL) { @@ -111,9 +109,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { SOperatorInfo* ds2 = pOperator->pDownstream[1]; - publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC); pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2); - publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC); pJoinInfo->rightPos = 0; if (pJoinInfo->pRight == NULL) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4e27b6ac4f..c68e6897e4 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -253,9 +253,12 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca addTagPseudoColumnData(pTableScanInfo, pBlock); } - // todo record the filter time cost + int64_t st = taosGetTimestampMs(); doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); + int64_t et = taosGetTimestampMs(); + pTableScanInfo->readRecorder.filterTime += (et - st); + if (pBlock->info.rows == 0) { pCost->filterOutBlocks += 1; qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), @@ -347,6 +350,8 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { STableScanInfo* pTableScanInfo = pOperator->info; SSDataBlock* pBlock = pTableScanInfo->pResBlock; + int64_t st = taosGetTimestampUs(); + while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { if (isTaskKilled(pOperator->pTaskInfo)) { longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); @@ -366,6 +371,10 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { continue; } + pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; + pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st)/1000.0; + + pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime; return pBlock; } @@ -452,6 +461,15 @@ SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) { return interval; } +static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { + SFileBlockLoadRecorder* pRecorder = taosMemoryCalloc(1, sizeof(SFileBlockLoadRecorder)); + STableScanInfo* pTableScanInfo = pOptr->info; + *pRecorder = pTableScanInfo->readRecorder; + *pOptrExplain = pRecorder; + *len = sizeof(SFileBlockLoadRecorder); + return 0; +} + static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { STableScanInfo* pTableScanInfo = (STableScanInfo*)param; taosMemoryFree(pTableScanInfo->pResBlock); @@ -509,14 +527,10 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pOperator->numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, NULL, NULL, NULL); - - static int32_t cost = 0; + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo, NULL, NULL, getTableScannerExecInfo); // for non-blocking operator, the open cost is always 0 pOperator->cost.openCost = 0; - pOperator->cost.totalCost = ++cost; - pOperator->resultInfo.totalRows = ++cost; return pOperator; } @@ -1603,18 +1617,20 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { STR_TO_VARSTR(str, mr.me.name); colDataAppend(pDst, count, str, false); } else { // it is a tag value - if(pDst->info.type == TSDB_DATA_TYPE_JSON){ - const uint8_t *tmp = mr.me.ctbEntry.pTags; - char *data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1); - if(data == NULL){ - qError("doTagScan calloc error:%d", kvRowLen(tmp) + 1); - return NULL; + if (pDst->info.type == TSDB_DATA_TYPE_JSON) { + const uint8_t* tmp = mr.me.ctbEntry.pTags; + // TODO opt perf by realloc memory + char* data = taosMemoryCalloc(kvRowLen(tmp) + 1, 1); + if (data == NULL) { + qError("%s failed to malloc memory, size:%d", GET_TASKID(pTaskInfo), kvRowLen(tmp) + 1); + longjmp(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } + *data = TSDB_DATA_TYPE_JSON; - memcpy(data+1, tmp, kvRowLen(tmp)); + memcpy(data + 1, tmp, kvRowLen(tmp)); colDataAppend(pDst, count, data, false); taosMemoryFree(data); - }else{ + } else { const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId); colDataAppend(pDst, count, p, (p == NULL)); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 588c3e90e7..6c69647691 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -782,13 +782,11 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { int32_t scanFlag = MAIN_SCAN; + int64_t st = taosGetTimestampUs(); SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { - publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); - if (pBlock == NULL) { break; } @@ -821,6 +819,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { closeAllResultRows(&pInfo->binfo.resultRowInfo); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->order); OPTR_SET_OPENED(pOperator); + + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; return TSDB_CODE_SUCCESS; } @@ -946,10 +946,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { - publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); - if (pBlock == NULL) { break; } @@ -998,7 +995,10 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { doSetOperatorCompleted(pOperator); } - return pBlock->info.rows == 0 ? NULL : pBlock; + size_t rows = pBlock->info.rows; + pOperator->resultInfo.totalRows += rows; + + return (rows == 0)? NULL:pBlock; } } @@ -1092,10 +1092,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SArray* pUpdated = NULL; while (1) { - publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); - if (pBlock == NULL) { break; } @@ -1425,9 +1422,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { - publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; } @@ -1472,9 +1467,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { - publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; } @@ -1702,12 +1695,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } while (1) { - publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; } + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true); if (pBlock->info.type == STREAM_REPROCESS) { doClearWindows(&pInfo->aggSup, &pInfo->binfo, &pInfo->interval, -- GitLab