提交 ff48ffd8 编写于 作者: S Shenglian Zhou 提交者: shenglian zhou

[TD-4561]feature:support statistics about operators running time

......@@ -898,7 +898,9 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL;
while(1) {
bool prev = *newgroup;
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
*newgroup = prev;
break;
......@@ -966,7 +968,9 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
SSDataBlock* pBlock = NULL;
if (pInfo->currentGroupOffset == 0) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -974,7 +978,9 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) {
while ((*newgroup) == false) { // ignore the remain blocks
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -986,7 +992,10 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
return pBlock;
}
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -1000,7 +1009,10 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
}
while ((*newgroup) == false) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......
......@@ -825,7 +825,10 @@ static void fetchNextBlockIfCompleted(SOperatorInfo* pOperator, bool* newgroup)
SJoinStatus* pStatus = &pJoinInfo->status[i];
if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) {
tscDebug("Retrieve nest query result, index:%d, total:%d", i, pOperator->numOfUpstream);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
pStatus->index = 0;
if (pStatus->pBlock == NULL) {
......
......@@ -133,6 +133,28 @@ typedef struct STableQueryInfo {
SResultRowInfo resInfo;
} STableQueryInfo;
typedef enum {
QUERY_PROF_BEFORE_OPERATOR_EXEC = 0,
QUERY_PROF_AFTER_OPERATOR_EXEC,
QUERY_PROF_QUERY_ABORT
} EQueryProfEventType;
typedef struct {
EQueryProfEventType eventType;
int64_t eventTime;
union {
uint8_t operatorType; //for operator event
int32_t abortCode; //for query abort event
};
} SQueryProfEvent;
typedef struct {
uint8_t operatorType;
int64_t sumSelfTime;
int64_t sumRunTimes;
} SOperatorProfResult;
typedef struct SQueryCostInfo {
uint64_t loadStatisTime;
uint64_t loadFileBlockTime;
......@@ -154,6 +176,9 @@ typedef struct SQueryCostInfo {
uint64_t tableInfoSize;
uint64_t hashSize;
uint64_t numOfTimeWindows;
SArray* queryProfEvents; //SArray<SQueryProfEvent>
SHashObj* operatorProfResults; //map<operator_type, SQueryProfEvent>
} SQueryCostInfo;
typedef struct {
......@@ -586,7 +611,12 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data);
size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows);
void setQueryKilled(SQInfo *pQInfo);
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType);
void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code);
void calculateOperatorProfResults(SQInfo* pQInfo);
void queryCostStatis(SQInfo *pQInfo);
void freeQInfo(SQInfo *pQInfo);
void freeQueryAttr(SQueryAttr *pQuery);
......
......@@ -3791,6 +3791,88 @@ int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutp
return pOutput->info.rows;
}
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType) {
SQueryProfEvent event;
event.eventType = eventType;
event.eventTime = taosGetTimestampUs();
event.operatorType = operatorInfo->operatorType;
SQInfo* qInfo = operatorInfo->pRuntimeEnv->qinfo;
taosArrayPush(qInfo->summary.queryProfEvents, &event);
}
void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code) {
SQueryProfEvent event;
event.eventType = QUERY_PROF_QUERY_ABORT;
event.eventTime = taosGetTimestampUs();
event.abortCode = code;
taosArrayPush(pQInfo->summary.queryProfEvents, &event);
}
typedef struct {
uint8_t operatorType;
int64_t beginTime;
int64_t endTime;
int64_t selfTime;
int64_t descendantsTime;
} SOperatorStackItem;
static void doOperatorExecProfOnce(SOperatorStackItem* item, SQueryProfEvent* event, SArray* opStack, SHashObj* profResults) {
item->endTime = event->eventTime;
item->selfTime = (item->endTime - item->beginTime) - (item->descendantsTime);
for (int32_t j = 0; j < taosArrayGetSize(opStack); ++j) {
SOperatorStackItem* ancestor = taosArrayGet(opStack, j);
ancestor->descendantsTime += item->selfTime;
}
uint8_t operatorType = item->operatorType;
SOperatorProfResult* result = taosHashGet(profResults, &operatorType, sizeof(operatorType));
if (result != NULL) {
result->sumRunTimes++;
result->sumSelfTime += item->selfTime;
} else {
SOperatorProfResult opResult;
opResult.operatorType = operatorType;
opResult.sumSelfTime = item->selfTime;
opResult.sumRunTimes = 1;
taosHashPut(profResults, &(operatorType), sizeof(operatorType),
&opResult, sizeof(opResult));
}
}
void calculateOperatorProfResults(SQInfo* pQInfo) {
SArray* opStack = taosArrayInit(32, sizeof(SOperatorStackItem));
if (opStack == NULL) {
return;
}
size_t size = taosArrayGetSize(pQInfo->summary.queryProfEvents);
SHashObj* profResults = pQInfo->summary.operatorProfResults;
for (int i = 0; i < size; ++i) {
SQueryProfEvent* event = taosArrayGet(pQInfo->summary.queryProfEvents, i);
if (event->eventType == QUERY_PROF_BEFORE_OPERATOR_EXEC) {
SOperatorStackItem opItem;
opItem.operatorType = event->operatorType;
opItem.beginTime = event->eventTime;
opItem.descendantsTime = 0;
taosArrayPush(opStack, &opItem);
} else if (event->eventType == QUERY_PROF_AFTER_OPERATOR_EXEC) {
SOperatorStackItem* item = taosArrayPop(opStack);
assert(item->operatorType == event->operatorType);
doOperatorExecProfOnce(item, event, opStack, profResults);
} else if (event->eventType == QUERY_PROF_QUERY_ABORT) {
SOperatorStackItem* item;
while ((item = taosArrayPop(opStack)) != NULL) {
doOperatorExecProfOnce(item, event, opStack, profResults);
}
}
}
taosArrayDestroy(opStack);
}
void queryCostStatis(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryCostInfo *pSummary = &pQInfo->summary;
......@@ -3811,6 +3893,8 @@ void queryCostStatis(SQInfo *pQInfo) {
pSummary->numOfTimeWindows = 0;
}
calculateOperatorProfResults(pQInfo);
qDebug("QInfo:0x%"PRIx64" :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, "
"load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64,
pQInfo->qId, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis,
......@@ -3818,6 +3902,13 @@ void queryCostStatis(SQInfo *pQInfo) {
qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
SOperatorProfResult* opRes = taosHashIterate(pSummary->operatorProfResults, NULL);
while (opRes != NULL) {
qDebug("QInfo:0x%"PRIx64" :cost summary: operator : %d, exec times: %"PRId64", self time: %"PRId64, pQInfo->qId,
opRes->operatorType, opRes->sumRunTimes, opRes->sumSelfTime );
opRes = taosHashIterate(pSummary->operatorProfResults, opRes);
}
}
//static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
......@@ -4219,6 +4310,9 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
// create runtime environment
int32_t numOfTables = (int32_t)pQueryAttr->tableGroupInfo.numOfTables;
pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo));
pQInfo->summary.queryProfEvents = taosArrayInit(512, sizeof(SQueryProfEvent));
pQInfo->summary.operatorProfResults =
taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TINYINT), true, HASH_NO_LOCK);
code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQueryAttr->tableGroupInfo.numOfTables, pOperator, param);
if (code != TSDB_CODE_SUCCESS) {
......@@ -4843,7 +4937,10 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -4898,7 +4995,10 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -4978,7 +5078,10 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
bool prevVal = *newgroup;
// The upstream exec may change the value of the newgroup, so use a local variable instead.
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
assert(*newgroup == false);
......@@ -5038,7 +5141,10 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL;
while (1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -5088,7 +5194,10 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
while (1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -5133,7 +5242,10 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -5186,7 +5298,10 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -5314,7 +5429,10 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->upstream[0];
while (1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -5372,7 +5490,9 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -5423,7 +5543,9 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
......@@ -5489,7 +5611,10 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
}
while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (*newgroup) {
assert(pBlock != NULL);
}
......@@ -6159,7 +6284,10 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
pRes->info.rows = 0;
SSDataBlock* pBlock = NULL;
while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -7483,6 +7611,9 @@ void freeQInfo(SQInfo *pQInfo) {
tfree(pQInfo->pBuf);
tfree(pQInfo->sql);
taosArrayDestroy(pQInfo->summary.queryProfEvents);
taosHashCleanup(pQInfo->summary.operatorProfResults);
taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows);
pQInfo->signature = 0;
......
......@@ -232,6 +232,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
// error occurs, record the error code and return to client
int32_t ret = setjmp(pQInfo->runtimeEnv.env);
if (ret != TSDB_CODE_SUCCESS) {
publishQueryAbortEvent(pQInfo, ret);
pQInfo->code = ret;
qDebug("QInfo:0x%"PRIx64" query abort due to error/cancel occurs, code:%s", pQInfo->qId, tstrerror(pQInfo->code));
return doBuildResCheck(pQInfo);
......@@ -240,7 +241,9 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
qDebug("QInfo:0x%"PRIx64" query task is launched", pQInfo->qId);
bool newgroup = false;
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup);
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_AFTER_OPERATOR_EXEC);
pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv);
if (isQueryKilled(pQInfo)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册