From 4fd1d1f17a187f12404e169db97415a3a8a9be75 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 2 Aug 2021 19:03:29 +0800 Subject: [PATCH] [td-225]tracking the total number of qhandle in dnode. --- src/vnode/inc/vnodeInt.h | 1 + src/vnode/src/vnodeRead.c | 29 +++++++++++++++++++++++++---- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/vnode/inc/vnodeInt.h b/src/vnode/inc/vnodeInt.h index ef05cf4a40..4864b79dc4 100644 --- a/src/vnode/inc/vnodeInt.h +++ b/src/vnode/inc/vnodeInt.h @@ -26,6 +26,7 @@ extern "C" { #include "vnode.h" extern int32_t vDebugFlag; +extern int32_t vNumOfExistedQHandle; // current initialized and existed query handle in current dnode #define vFatal(...) { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", 255, __VA_ARGS__); }} #define vError(...) { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", 255, __VA_ARGS__); }} diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 6bc009209b..64f87ba5ca 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -21,6 +21,8 @@ #include "query.h" #include "vnodeStatus.h" +int32_t vNumOfExistedQHandle; // current initialized and existed query handle in current dnode + static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead); @@ -247,7 +249,8 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { if (handle == NULL) { // failed to register qhandle pRsp->code = terrno; terrno = 0; - vError("vgId:%d, QInfo:0x%"PRIx64 "-%p register qhandle failed, return to app, code:%s", pVnode->vgId, qId, (void *)pQInfo, + + vError("vgId:%d, QInfo:0x%"PRIx64 "-%p register qhandle failed, return to app, code:%s,", pVnode->vgId, qId, (void *)pQInfo, tstrerror(pRsp->code)); qDestroyQueryInfo(pQInfo); // destroy it directly return pRsp->code; @@ -260,10 +263,12 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { vnodeNotifyCurrentQhandle(pRead->rpcHandle, qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { vError("vgId:%d, QInfo:0x%"PRIx64 "-%p, query discarded since link is broken, %p", pVnode->vgId, qId, *handle, pRead->rpcHandle); + pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); return pRsp->code; } + } else { assert(pQInfo == NULL); } @@ -277,6 +282,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { return pRsp->code; } } + + int32_t remain = atomic_add_fetch_32(&vNumOfExistedQHandle, 1); + vTrace("vgId:%d, new qhandle created, total qhandle:%d", pVnode->vgId, remain); } else { assert(pCont != NULL); void **qhandle = (void **)pRead->qhandle; @@ -318,8 +326,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { // NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle. // If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle if (freehandle || (!buildRes)) { + if (freehandle) { + int32_t remain = atomic_sub_fetch_32(&vNumOfExistedQHandle, 1); + vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *qhandle, remain); + } + qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle); } + } } @@ -357,7 +371,10 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { // kill current query and free corresponding resources. if (pRetrieve->free == 1) { - vWarn("vgId:%d, QInfo:%"PRIx64 "-%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, pRetrieve->qId, *handle); + int32_t remain = atomic_sub_fetch_32(&vNumOfExistedQHandle, 1); + vWarn("vgId:%d, QInfo:%"PRIx64 "-%p, retrieve msg received to kill query and free qhandle, remain qhandle:%d", pVnode->vgId, pRetrieve->qId, + *handle, remain); + qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); @@ -368,7 +385,10 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { // register the qhandle to connect to quit query immediate if connection is broken if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, pRetrieve->qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { - vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, conn:%p", pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle); + int32_t remain = atomic_sub_fetch_32(&vNumOfExistedQHandle, 1); + vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, conn:%p, remain qhandle:%d", pVnode->vgId, pRetrieve->qhandle, + *handle, pRead->rpcHandle, remain); + code = TSDB_CODE_RPC_NETWORK_UNAVAIL; qKillQuery(*handle); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); @@ -390,7 +410,6 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { if (!tsRetrieveBlockingModel) { if (!buildRes) { assert(pRead->rpcHandle != NULL); - qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false); return TSDB_CODE_QRY_NOT_READY; } @@ -403,6 +422,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { // If qhandle is not added into vread queue, the query should be completed already or paused with error. // Here free qhandle immediately if (freeHandle) { + int32_t remain = atomic_sub_fetch_32(&vNumOfExistedQHandle, 1); + vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *handle, remain); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); } -- GitLab