未验证 提交 ed44b72c 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #6628 from taosdata/feature/td-4561

[TD-4561]feature:support statistics about operators running time
...@@ -898,7 +898,9 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -898,7 +898,9 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while(1) { while(1) {
bool prev = *newgroup; bool prev = *newgroup;
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = upstream->exec(upstream, newgroup); pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
*newgroup = prev; *newgroup = prev;
break; break;
...@@ -966,7 +968,9 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -966,7 +968,9 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
if (pInfo->currentGroupOffset == 0) { if (pInfo->currentGroupOffset == 0) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -974,7 +978,9 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -974,7 +978,9 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) { if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) {
while ((*newgroup) == false) { // ignore the remain blocks while ((*newgroup) == false) { // ignore the remain blocks
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -986,7 +992,10 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -986,7 +992,10 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
return pBlock; return pBlock;
} }
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -1000,7 +1009,10 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -1000,7 +1009,10 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
} }
while ((*newgroup) == false) { while ((*newgroup) == false) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
......
...@@ -825,7 +825,10 @@ static void fetchNextBlockIfCompleted(SOperatorInfo* pOperator, bool* newgroup) ...@@ -825,7 +825,10 @@ static void fetchNextBlockIfCompleted(SOperatorInfo* pOperator, bool* newgroup)
SJoinStatus* pStatus = &pJoinInfo->status[i]; SJoinStatus* pStatus = &pJoinInfo->status[i];
if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) { if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) {
tscDebug("Retrieve nest query result, index:%d, total:%d", i, pOperator->numOfUpstream); tscDebug("Retrieve nest query result, index:%d, total:%d", i, pOperator->numOfUpstream);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup); pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
pStatus->index = 0; pStatus->index = 0;
if (pStatus->pBlock == NULL) { if (pStatus->pBlock == NULL) {
......
...@@ -133,6 +133,28 @@ typedef struct STableQueryInfo { ...@@ -133,6 +133,28 @@ typedef struct STableQueryInfo {
SResultRowInfo resInfo; SResultRowInfo resInfo;
} STableQueryInfo; } STableQueryInfo;
typedef enum {
QUERY_PROF_BEFORE_OPERATOR_EXEC = 0,
QUERY_PROF_AFTER_OPERATOR_EXEC,
QUERY_PROF_QUERY_ABORT
} EQueryProfEventType;
typedef struct {
EQueryProfEventType eventType;
int64_t eventTime;
union {
uint8_t operatorType; //for operator event
int32_t abortCode; //for query abort event
};
} SQueryProfEvent;
typedef struct {
uint8_t operatorType;
int64_t sumSelfTime;
int64_t sumRunTimes;
} SOperatorProfResult;
typedef struct SQueryCostInfo { typedef struct SQueryCostInfo {
uint64_t loadStatisTime; uint64_t loadStatisTime;
uint64_t loadFileBlockTime; uint64_t loadFileBlockTime;
...@@ -154,6 +176,9 @@ typedef struct SQueryCostInfo { ...@@ -154,6 +176,9 @@ typedef struct SQueryCostInfo {
uint64_t tableInfoSize; uint64_t tableInfoSize;
uint64_t hashSize; uint64_t hashSize;
uint64_t numOfTimeWindows; uint64_t numOfTimeWindows;
SArray* queryProfEvents; //SArray<SQueryProfEvent>
SHashObj* operatorProfResults; //map<operator_type, SQueryProfEvent>
} SQueryCostInfo; } SQueryCostInfo;
typedef struct { typedef struct {
...@@ -587,7 +612,12 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data); ...@@ -587,7 +612,12 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data);
size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows); size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows);
void setQueryKilled(SQInfo *pQInfo); void setQueryKilled(SQInfo *pQInfo);
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType);
void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code);
void calculateOperatorProfResults(SQInfo* pQInfo);
void queryCostStatis(SQInfo *pQInfo); void queryCostStatis(SQInfo *pQInfo);
void freeQInfo(SQInfo *pQInfo); void freeQInfo(SQInfo *pQInfo);
void freeQueryAttr(SQueryAttr *pQuery); void freeQueryAttr(SQueryAttr *pQuery);
......
...@@ -3785,6 +3785,88 @@ int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutp ...@@ -3785,6 +3785,88 @@ int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutp
return pOutput->info.rows; return pOutput->info.rows;
} }
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType) {
SQueryProfEvent event;
event.eventType = eventType;
event.eventTime = taosGetTimestampUs();
event.operatorType = operatorInfo->operatorType;
SQInfo* qInfo = operatorInfo->pRuntimeEnv->qinfo;
taosArrayPush(qInfo->summary.queryProfEvents, &event);
}
void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code) {
SQueryProfEvent event;
event.eventType = QUERY_PROF_QUERY_ABORT;
event.eventTime = taosGetTimestampUs();
event.abortCode = code;
taosArrayPush(pQInfo->summary.queryProfEvents, &event);
}
typedef struct {
uint8_t operatorType;
int64_t beginTime;
int64_t endTime;
int64_t selfTime;
int64_t descendantsTime;
} SOperatorStackItem;
static void doOperatorExecProfOnce(SOperatorStackItem* item, SQueryProfEvent* event, SArray* opStack, SHashObj* profResults) {
item->endTime = event->eventTime;
item->selfTime = (item->endTime - item->beginTime) - (item->descendantsTime);
for (int32_t j = 0; j < taosArrayGetSize(opStack); ++j) {
SOperatorStackItem* ancestor = taosArrayGet(opStack, j);
ancestor->descendantsTime += item->selfTime;
}
uint8_t operatorType = item->operatorType;
SOperatorProfResult* result = taosHashGet(profResults, &operatorType, sizeof(operatorType));
if (result != NULL) {
result->sumRunTimes++;
result->sumSelfTime += item->selfTime;
} else {
SOperatorProfResult opResult;
opResult.operatorType = operatorType;
opResult.sumSelfTime = item->selfTime;
opResult.sumRunTimes = 1;
taosHashPut(profResults, &(operatorType), sizeof(operatorType),
&opResult, sizeof(opResult));
}
}
void calculateOperatorProfResults(SQInfo* pQInfo) {
SArray* opStack = taosArrayInit(32, sizeof(SOperatorStackItem));
if (opStack == NULL) {
return;
}
size_t size = taosArrayGetSize(pQInfo->summary.queryProfEvents);
SHashObj* profResults = pQInfo->summary.operatorProfResults;
for (int i = 0; i < size; ++i) {
SQueryProfEvent* event = taosArrayGet(pQInfo->summary.queryProfEvents, i);
if (event->eventType == QUERY_PROF_BEFORE_OPERATOR_EXEC) {
SOperatorStackItem opItem;
opItem.operatorType = event->operatorType;
opItem.beginTime = event->eventTime;
opItem.descendantsTime = 0;
taosArrayPush(opStack, &opItem);
} else if (event->eventType == QUERY_PROF_AFTER_OPERATOR_EXEC) {
SOperatorStackItem* item = taosArrayPop(opStack);
assert(item->operatorType == event->operatorType);
doOperatorExecProfOnce(item, event, opStack, profResults);
} else if (event->eventType == QUERY_PROF_QUERY_ABORT) {
SOperatorStackItem* item;
while ((item = taosArrayPop(opStack)) != NULL) {
doOperatorExecProfOnce(item, event, opStack, profResults);
}
}
}
taosArrayDestroy(opStack);
}
void queryCostStatis(SQInfo *pQInfo) { void queryCostStatis(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryCostInfo *pSummary = &pQInfo->summary; SQueryCostInfo *pSummary = &pQInfo->summary;
...@@ -3805,6 +3887,8 @@ void queryCostStatis(SQInfo *pQInfo) { ...@@ -3805,6 +3887,8 @@ void queryCostStatis(SQInfo *pQInfo) {
pSummary->numOfTimeWindows = 0; pSummary->numOfTimeWindows = 0;
} }
calculateOperatorProfResults(pQInfo);
qDebug("QInfo:0x%"PRIx64" :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, " qDebug("QInfo:0x%"PRIx64" :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, "
"load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64, "load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64,
pQInfo->qId, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis, pQInfo->qId, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis,
...@@ -3812,6 +3896,13 @@ void queryCostStatis(SQInfo *pQInfo) { ...@@ -3812,6 +3896,13 @@ void queryCostStatis(SQInfo *pQInfo) {
qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0, qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0); pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
SOperatorProfResult* opRes = taosHashIterate(pSummary->operatorProfResults, NULL);
while (opRes != NULL) {
qDebug("QInfo:0x%"PRIx64" :cost summary: operator : %d, exec times: %"PRId64", self time: %"PRId64, pQInfo->qId,
opRes->operatorType, opRes->sumRunTimes, opRes->sumSelfTime );
opRes = taosHashIterate(pSummary->operatorProfResults, opRes);
}
} }
//static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) { //static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
...@@ -4213,6 +4304,9 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr ...@@ -4213,6 +4304,9 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
// create runtime environment // create runtime environment
int32_t numOfTables = (int32_t)pQueryAttr->tableGroupInfo.numOfTables; int32_t numOfTables = (int32_t)pQueryAttr->tableGroupInfo.numOfTables;
pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo)); pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo));
pQInfo->summary.queryProfEvents = taosArrayInit(512, sizeof(SQueryProfEvent));
pQInfo->summary.operatorProfResults =
taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TINYINT), true, HASH_NO_LOCK);
code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQueryAttr->tableGroupInfo.numOfTables, pOperator, param); code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQueryAttr->tableGroupInfo.numOfTables, pOperator, param);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -4837,7 +4931,10 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) { ...@@ -4837,7 +4931,10 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0]; SOperatorInfo* upstream = pOperator->upstream[0];
while(1) { while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
...@@ -4892,7 +4989,10 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) { ...@@ -4892,7 +4989,10 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0]; SOperatorInfo* upstream = pOperator->upstream[0];
while(1) { while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
...@@ -4972,7 +5072,10 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) { ...@@ -4972,7 +5072,10 @@ static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
bool prevVal = *newgroup; bool prevVal = *newgroup;
// The upstream exec may change the value of the newgroup, so use a local variable instead. // The upstream exec may change the value of the newgroup, so use a local variable instead.
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
assert(*newgroup == false); assert(*newgroup == false);
...@@ -5032,7 +5135,10 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) { ...@@ -5032,7 +5135,10 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while (1) { while (1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -5082,7 +5188,10 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) { ...@@ -5082,7 +5188,10 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
while (1) { while (1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
...@@ -5127,7 +5236,10 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) { ...@@ -5127,7 +5236,10 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0]; SOperatorInfo* upstream = pOperator->upstream[0];
while(1) { while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
...@@ -5180,7 +5292,10 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { ...@@ -5180,7 +5292,10 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0]; SOperatorInfo* upstream = pOperator->upstream[0];
while(1) { while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
...@@ -5308,7 +5423,10 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) { ...@@ -5308,7 +5423,10 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
STimeWindow win = pQueryAttr->window; STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->upstream[0]; SOperatorInfo* upstream = pOperator->upstream[0];
while (1) { while (1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
...@@ -5366,7 +5484,9 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) { ...@@ -5366,7 +5484,9 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0]; SOperatorInfo* upstream = pOperator->upstream[0];
while(1) { while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
...@@ -5417,7 +5537,9 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { ...@@ -5417,7 +5537,9 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0]; SOperatorInfo* upstream = pOperator->upstream[0];
while(1) { while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup); SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
break; break;
} }
...@@ -5483,7 +5605,10 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { ...@@ -5483,7 +5605,10 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
} }
while(1) { while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (*newgroup) { if (*newgroup) {
assert(pBlock != NULL); assert(pBlock != NULL);
} }
...@@ -6153,7 +6278,10 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { ...@@ -6153,7 +6278,10 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
pRes->info.rows = 0; pRes->info.rows = 0;
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while(1) { while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -7479,6 +7607,9 @@ void freeQInfo(SQInfo *pQInfo) { ...@@ -7479,6 +7607,9 @@ void freeQInfo(SQInfo *pQInfo) {
tfree(pQInfo->pBuf); tfree(pQInfo->pBuf);
tfree(pQInfo->sql); tfree(pQInfo->sql);
taosArrayDestroy(pQInfo->summary.queryProfEvents);
taosHashCleanup(pQInfo->summary.operatorProfResults);
taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows); taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows);
pQInfo->signature = 0; pQInfo->signature = 0;
......
...@@ -232,6 +232,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { ...@@ -232,6 +232,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
// error occurs, record the error code and return to client // error occurs, record the error code and return to client
int32_t ret = setjmp(pQInfo->runtimeEnv.env); int32_t ret = setjmp(pQInfo->runtimeEnv.env);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
publishQueryAbortEvent(pQInfo, ret);
pQInfo->code = ret; pQInfo->code = ret;
qDebug("QInfo:0x%"PRIx64" query abort due to error/cancel occurs, code:%s", pQInfo->qId, tstrerror(pQInfo->code)); qDebug("QInfo:0x%"PRIx64" query abort due to error/cancel occurs, code:%s", pQInfo->qId, tstrerror(pQInfo->code));
return doBuildResCheck(pQInfo); return doBuildResCheck(pQInfo);
...@@ -240,7 +241,9 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { ...@@ -240,7 +241,9 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
qDebug("QInfo:0x%"PRIx64" query task is launched", pQInfo->qId); qDebug("QInfo:0x%"PRIx64" query task is launched", pQInfo->qId);
bool newgroup = false; bool newgroup = false;
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup); pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup);
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_AFTER_OPERATOR_EXEC);
pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv); pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv);
if (isQueryKilled(pQInfo)) { if (isQueryKilled(pQInfo)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册