未验证 提交 98cee379 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #20033 from taosdata/fix/TD-22556

fix: refact client statistics collection
...@@ -97,16 +97,14 @@ typedef struct { ...@@ -97,16 +97,14 @@ typedef struct {
typedef struct SQueryExecMetric { typedef struct SQueryExecMetric {
int64_t start; // start timestamp, us int64_t start; // start timestamp, us
int64_t syntaxStart; // start to parse, us
int64_t syntaxEnd; // end to parse, us
int64_t ctgStart; // start to parse, us int64_t ctgStart; // start to parse, us
int64_t ctgEnd; // end to parse, us int64_t execStart; // start to parse, us
int64_t semanticEnd;
int64_t planEnd; int64_t parseCostUs;
int64_t resultReady; int64_t ctgCostUs;
int64_t execEnd; int64_t analyseCostUs;
int64_t send; // start to send to server, us int64_t planCostUs;
int64_t rsp; // receive response from server, us int64_t execCostUs;
} SQueryExecMetric; } SQueryExecMetric;
struct SAppInstInfo { struct SAppInstInfo {
......
...@@ -79,23 +79,22 @@ static void deregisterRequest(SRequestObj *pRequest) { ...@@ -79,23 +79,22 @@ static void deregisterRequest(SRequestObj *pRequest) {
"current:%d, app current:%d", "current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst); pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->stmtType) { if (pRequest->pQuery && pRequest->pQuery->pRoot) {
// tscPerf("insert duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->pQuery->pRoot->type && (0 == ((SVnodeModifOpStmt*)pRequest->pQuery->pRoot)->sqlNodeType)) {
// "us, exec:%" PRId64 "us", tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
// duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart, "us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
// pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
// pRequest->metric.ctgEnd, pRequest->metric.execEnd - pRequest->metric.semanticEnd); pRequest->metric.planCostUs, pRequest->metric.execCostUs);
atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration);
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
// tscPerf("select duration %" PRId64 "us: syntax:%" PRId64 "us, ctg:%" PRId64 "us, semantic:%" PRId64 tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64
// "us, planner:%" PRId64 "us, exec:%" PRId64 "us, reqId:0x%" PRIx64, "us, planCost:%" PRId64 "us, exec:%" PRId64 "us",
// duration, pRequest->metric.syntaxEnd - pRequest->metric.syntaxStart, duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs,
// pRequest->metric.ctgEnd - pRequest->metric.ctgStart, pRequest->metric.semanticEnd - pRequest->metric.planCostUs, pRequest->metric.execCostUs);
// pRequest->metric.ctgEnd, pRequest->metric.planEnd - pRequest->metric.semanticEnd,
// pRequest->metric.resultReady - pRequest->metric.planEnd, pRequest->requestId);
atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration);
} }
}
if (duration >= SLOW_QUERY_INTERVAL) { if (duration >= SLOW_QUERY_INTERVAL) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1); atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
...@@ -362,8 +361,6 @@ void doDestroyRequest(void *p) { ...@@ -362,8 +361,6 @@ void doDestroyRequest(void *p) {
taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->dbList); taosArrayDestroy(pRequest->dbList);
taosArrayDestroy(pRequest->targetTableList); taosArrayDestroy(pRequest->targetTableList);
qDestroyQuery(pRequest->pQuery);
nodesDestroyAllocator(pRequest->allocatorRefId);
destroyQueryExecRes(&pRequest->body.resInfo.execRes); destroyQueryExecRes(&pRequest->body.resInfo.execRes);
...@@ -378,6 +375,9 @@ void doDestroyRequest(void *p) { ...@@ -378,6 +375,9 @@ void doDestroyRequest(void *p) {
taosMemoryFree(pRequest->body.param); taosMemoryFree(pRequest->body.param);
} }
qDestroyQuery(pRequest->pQuery);
nodesDestroyAllocator(pRequest->allocatorRefId);
taosMemoryFreeClear(pRequest->sqlstr); taosMemoryFreeClear(pRequest->sqlstr);
taosMemoryFree(pRequest); taosMemoryFree(pRequest);
tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest); tscTrace("end to destroy request %" PRIx64 " p:%p", reqId, pRequest);
......
...@@ -946,7 +946,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { ...@@ -946,7 +946,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
removeMeta(pTscObj, pRequest->targetTableList); removeMeta(pTscObj, pRequest->targetTableList);
} }
pRequest->metric.execEnd = taosGetTimestampUs(); pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
int32_t code1 = handleQueryExecRsp(pRequest); int32_t code1 = handleQueryExecRsp(pRequest);
if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) { if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
pRequest->code = code1; pRequest->code = code1;
...@@ -1072,11 +1072,10 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat ...@@ -1072,11 +1072,10 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
pRequest->body.subplanNum = pDag->numOfSubplans; pRequest->body.subplanNum = pDag->numOfSubplans;
} }
pRequest->metric.planEnd = taosGetTimestampUs(); pRequest->metric.execStart = taosGetTimestampUs();
if (code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " create query plan success, elapsed time:%.2f ms, 0x%" PRIx64, pRequest->self, pRequest->metric.planCostUs = pRequest->metric.execStart - st;
(pRequest->metric.planEnd - st) / 1000.0, pRequest->requestId);
}
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) { if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
SArray* pNodeList = NULL; SArray* pNodeList = NULL;
if (QUERY_NODE_VNODE_MODIF_STMT != nodeType(pQuery->pRoot)) { if (QUERY_NODE_VNODE_MODIF_STMT != nodeType(pQuery->pRoot)) {
...@@ -1124,6 +1123,16 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM ...@@ -1124,6 +1123,16 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
destorySqlCallbackWrapper(pWrapper); destorySqlCallbackWrapper(pWrapper);
} }
if (pQuery->pRoot && !pRequest->inRetry) {
STscObj* pTscObj = pRequest->pTscObj;
SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
if (QUERY_NODE_VNODE_MODIF_STMT == pQuery->pRoot->type && (0 == ((SVnodeModifOpStmt*)pQuery->pRoot)->sqlNodeType)) {
atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
} else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
}
}
switch (pQuery->execMode) { switch (pQuery->execMode) {
case QUERY_EXEC_MODE_LOCAL: case QUERY_EXEC_MODE_LOCAL:
asyncExecLocalCmd(pRequest, pQuery); asyncExecLocalCmd(pRequest, pQuery);
...@@ -1393,21 +1402,7 @@ int32_t doProcessMsgFromServer(void* param) { ...@@ -1393,21 +1402,7 @@ int32_t doProcessMsgFromServer(void* param) {
SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
if (pRequest) { if (pRequest) {
assert(pRequest->self == pSendInfo->requestObjRefId); assert(pRequest->self == pSendInfo->requestObjRefId);
pRequest->metric.rsp = taosGetTimestampUs();
pTscObj = pRequest->pTscObj; pTscObj = pRequest->pTscObj;
/*
* There is not response callback function for submit response.
* The actual inserted number of points is the first number.
*/
int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
if (pMsg->code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " rsp msg:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
} else {
tscError("0x%" PRIx64 " rsp msg:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
}
} }
} }
......
...@@ -707,7 +707,8 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t ...@@ -707,7 +707,8 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
SRequestObj *pRequest = pWrapper->pRequest; SRequestObj *pRequest = pWrapper->pRequest;
SQuery *pQuery = pRequest->pQuery; SQuery *pQuery = pRequest->pQuery;
pRequest->metric.ctgEnd = taosGetTimestampUs(); int64_t analyseStart = taosGetTimestampUs();
pRequest->metric.ctgCostUs = analyseStart - pRequest->metric.ctgStart;
qDebug("0x%" PRIx64 " start to semantic analysis, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId); qDebug("0x%" PRIx64 " start to semantic analysis, reqId:0x%" PRIx64, pRequest->self, pRequest->requestId);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
...@@ -718,7 +719,7 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t ...@@ -718,7 +719,7 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
} }
} }
pRequest->metric.semanticEnd = taosGetTimestampUs(); pRequest->metric.analyseCostUs = taosGetTimestampUs() - analyseStart;
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
if (pQuery->haveResultSet) { if (pQuery->haveResultSet) {
...@@ -730,10 +731,6 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t ...@@ -730,10 +731,6 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
TSWAP(pRequest->tableList, (pQuery)->pTableList); TSWAP(pRequest->tableList, (pQuery)->pTableList);
TSWAP(pRequest->targetTableList, (pQuery)->pTargetTableList); TSWAP(pRequest->targetTableList, (pQuery)->pTargetTableList);
double el = (pRequest->metric.semanticEnd - pRequest->metric.ctgEnd) / 1000.0;
tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, elapsed time:%.2f ms, reqId:0x%" PRIx64,
pRequest->self, el, pRequest->requestId);
launchAsyncQuery(pRequest, pQuery, pResultMeta, pWrapper); launchAsyncQuery(pRequest, pQuery, pResultMeta, pWrapper);
} else { } else {
destorySqlCallbackWrapper(pWrapper); destorySqlCallbackWrapper(pWrapper);
...@@ -798,7 +795,7 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c ...@@ -798,7 +795,7 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c
SRequestObj *pRequest = pWrapper->pRequest; SRequestObj *pRequest = pWrapper->pRequest;
SQuery *pQuery = pRequest->pQuery; SQuery *pQuery = pRequest->pQuery;
pRequest->metric.ctgEnd = taosGetTimestampUs(); pRequest->metric.ctgCostUs += taosGetTimestampUs() - pRequest->metric.ctgStart;
qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId, qDebug("0x%" PRIx64 " start to continue parse, reqId:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId,
tstrerror(code)); tstrerror(code));
...@@ -910,7 +907,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { ...@@ -910,7 +907,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
pRequest->metric.syntaxStart = taosGetTimestampUs(); int64_t syntaxStart = taosGetTimestampUs();
pWrapper->pCatalogReq = taosMemoryCalloc(1, sizeof(SCatalogReq)); pWrapper->pCatalogReq = taosMemoryCalloc(1, sizeof(SCatalogReq));
if (pWrapper->pCatalogReq == NULL) { if (pWrapper->pCatalogReq == NULL) {
...@@ -921,16 +918,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { ...@@ -921,16 +918,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
code = qParseSqlSyntax(pWrapper->pParseCtx, &pRequest->pQuery, pWrapper->pCatalogReq); code = qParseSqlSyntax(pWrapper->pParseCtx, &pRequest->pQuery, pWrapper->pCatalogReq);
} }
pRequest->metric.syntaxEnd = taosGetTimestampUs(); pRequest->metric.parseCostUs += taosGetTimestampUs() - syntaxStart;
}
if (TSDB_CODE_SUCCESS == code && !updateMetaForce) {
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
if (QUERY_NODE_INSERT_STMT == nodeType(pRequest->pQuery->pRoot)) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfInsertsReq, 1);
} else if (QUERY_NODE_SELECT_STMT == nodeType(pRequest->pQuery->pRoot)) {
atomic_add_fetch_64((int64_t *)&pActivity->numOfQueryReq, 1);
}
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -961,7 +949,6 @@ static void fetchCallback(void *pResult, void *param, int32_t code) { ...@@ -961,7 +949,6 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
SRequestObj *pRequest = (SRequestObj *)param; SRequestObj *pRequest = (SRequestObj *)param;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo; SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
pRequest->metric.resultReady = taosGetTimestampUs();
tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
tstrerror(code), pRequest->requestId); tstrerror(code), pRequest->requestId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册