diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 48efe89361ac308ad20e6eca05a498e75b69c572..9225d671315723038cb61a0f21735103c3a64466 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -66,7 +66,7 @@ enum { typedef struct SAppInstInfo SAppInstInfo; typedef struct { - char* key; + char* key; // statistics int32_t reportCnt; int32_t connKeyCnt; @@ -118,6 +118,7 @@ struct SAppInstInfo { uint64_t clusterId; void* pTransporter; SAppHbMgr* pAppHbMgr; + char* instKey; }; typedef struct SAppInfo { @@ -336,6 +337,7 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp); // cluster level SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key); void appHbMgrCleanup(void); +void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr); // conn level int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index cad1b4b1cf25495ac82a44d9a4d42d678cbf044d..f5b5c48d470a32a49a38bb1b2652fafd4f65435f 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -79,13 +79,13 @@ static void deregisterRequest(SRequestObj *pRequest) { } // todo close the transporter properly -void closeTransporter(STscObj *pTscObj) { - if (pTscObj == NULL || pTscObj->pAppInfo->pTransporter == NULL) { +void closeTransporter(SAppInstInfo *pAppInfo) { + if (pAppInfo == NULL || pAppInfo->pTransporter == NULL) { return; } - tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id); - rpcClose(pTscObj->pAppInfo->pTransporter); + tscDebug("free transporter:%p in app inst %p", pAppInfo->pTransporter, pAppInfo); + rpcClose(pAppInfo->pTransporter); } static bool clientRpcRfp(int32_t code) { @@ -130,6 +130,21 @@ void closeAllRequests(SHashObj *pRequests) { } } +void destroyAppInst(SAppInstInfo* pAppInfo) { + tscDebug("destroy app inst mgr %p", pAppInfo); + + hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr); + taosHashRemove(appInfo.pInstMap, pAppInfo->instKey, strlen(pAppInfo->instKey)); + taosMemoryFreeClear(pAppInfo->instKey); + closeTransporter(pAppInfo); + + taosThreadMutexLock(&pAppInfo->qnodeMutex); + taosArrayDestroy(pAppInfo->pQnodeList); + taosThreadMutexUnlock(&pAppInfo->qnodeMutex); + + taosMemoryFree(pAppInfo); +} + void destroyTscObj(void *pObj) { STscObj *pTscObj = pObj; @@ -138,11 +153,12 @@ void destroyTscObj(void *pObj) { int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); closeAllRequests(pTscObj->pRequests); schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter); + tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj, + pTscObj->pAppInfo->numOfConns); + if (0 == connNum) { - closeTransporter(pTscObj); + destroyAppInst(pTscObj->pAppInfo); } - tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj, - pTscObj->pAppInfo->numOfConns); taosThreadMutexDestroy(&pTscObj->mutex); taosMemoryFreeClear(pTscObj); } @@ -174,6 +190,8 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c pObj->id = taosAddRef(clientConnRefPool, pObj); pObj->schemalessType = 1; + atomic_add_fetch_64(&pObj->pAppInfo->numOfConns, 1); + tscDebug("connObj created, 0x%" PRIx64 ",p:%p", pObj->id, pObj); return pObj; } diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 0681b5d0a5dc970c835a9a559a9e7e74a75f86be..4f4bb83fe5b0fe3d8ba0143be46790d7b9674e0c 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -790,22 +790,40 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) { return pAppHbMgr; } +void hbFreeAppHbMgr(SAppHbMgr *pTarget) { + void *pIter = taosHashIterate(pTarget->activeInfo, NULL); + while (pIter != NULL) { + SClientHbReq *pOneReq = pIter; + tFreeClientHbReq(pOneReq); + pIter = taosHashIterate(pTarget->activeInfo, pIter); + } + taosHashCleanup(pTarget->activeInfo); + pTarget->activeInfo = NULL; + + taosMemoryFree(pTarget->key); + taosMemoryFree(pTarget); +} + +void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) { + taosThreadMutexLock(&clientHbMgr.lock); + int32_t mgrSize = taosArrayGetSize(clientHbMgr.appHbMgrs); + for (int32_t i = 0; i < mgrSize; ++i) { + SAppHbMgr *pItem = taosArrayGetP(clientHbMgr.appHbMgrs, i); + if (pItem == *pAppHbMgr) { + hbFreeAppHbMgr(*pAppHbMgr); + *pAppHbMgr = NULL; + taosArrayRemove(clientHbMgr.appHbMgrs, i); + break; + } + } + taosThreadMutexUnlock(&clientHbMgr.lock); +} + void appHbMgrCleanup(void) { int sz = taosArrayGetSize(clientHbMgr.appHbMgrs); for (int i = 0; i < sz; i++) { SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i); - - void *pIter = taosHashIterate(pTarget->activeInfo, NULL); - while (pIter != NULL) { - SClientHbReq *pOneReq = pIter; - tFreeClientHbReq(pOneReq); - pIter = taosHashIterate(pTarget->activeInfo, pIter); - } - taosHashCleanup(pTarget->activeInfo); - pTarget->activeInfo = NULL; - - taosMemoryFree(pTarget->key); - taosMemoryFree(pTarget); + hbFreeAppHbMgr(pTarget); } } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index d3510b91ae7fc3d606823cbc8ff8c61835c1ba3a..8c6304632366acdc272eea1f016eaa6e70973b2c 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -122,7 +122,10 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); p->pAppHbMgr = appHbMgrInit(p, key); taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); - + p->instKey = key; + key = NULL; + tscDebug("new app inst mgr %p, user:%s, ip:%s, port:%d", p, user, ip, port); + pInst = &p; } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 818762eff6a8f75e30100e2ade15cf943e98563c..5c30df4ae2448f3443b4f32b92e23c8ebc96e4df 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -87,7 +87,6 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { // update the appInstInfo pTscObj->pAppInfo->clusterId = connectRsp.clusterId; - atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); pTscObj->connType = connectRsp.connType; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 6184d13533eb2b6e0c05a031e009dcd1d5a4c059..139539821c34673391622831578a00e212bc8fe1 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -789,6 +789,8 @@ _return: int32_t ctgCallUserCb(void* param) { SCtgJob* pJob = (SCtgJob*)param; + + //taosSsleep(2); (*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode); diff --git a/tests/script/api/stopquery.c b/tests/script/api/stopquery.c index 4c7964c98350f1d24bb6c76ffdb5aef5746e1933..0008aa3303d9a6cb377323791f31cdbdc755a95e 100644 --- a/tests/script/api/stopquery.c +++ b/tests/script/api/stopquery.c @@ -368,6 +368,7 @@ void *closeThreadFp(void *arg) { while (true) { if (qParam->taos) { usleep(rand() % 10000); + //usleep(1000000); taos_close(qParam->taos); break; }