提交 de06f055 编写于 作者: D dapan1121

enh: stop query

上级 73dfd117
......@@ -66,7 +66,7 @@ enum {
typedef struct SAppInstInfo SAppInstInfo;
typedef struct {
char* key;
char* key;
// statistics
int32_t reportCnt;
int32_t connKeyCnt;
......@@ -118,6 +118,7 @@ struct SAppInstInfo {
uint64_t clusterId;
void* pTransporter;
SAppHbMgr* pAppHbMgr;
char* instKey;
};
typedef struct SAppInfo {
......@@ -336,6 +337,7 @@ int hbHandleRsp(SClientHbBatchRsp* hbRsp);
// cluster level
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key);
void appHbMgrCleanup(void);
void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr);
// conn level
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int64_t tscRefId, int64_t clusterId, int8_t connType);
......
......@@ -79,13 +79,13 @@ static void deregisterRequest(SRequestObj *pRequest) {
}
// todo close the transporter properly
void closeTransporter(STscObj *pTscObj) {
if (pTscObj == NULL || pTscObj->pAppInfo->pTransporter == NULL) {
void closeTransporter(SAppInstInfo *pAppInfo) {
if (pAppInfo == NULL || pAppInfo->pTransporter == NULL) {
return;
}
tscDebug("free transporter:%p in connObj: 0x%" PRIx64, pTscObj->pAppInfo->pTransporter, pTscObj->id);
rpcClose(pTscObj->pAppInfo->pTransporter);
tscDebug("free transporter:%p in app inst %p", pAppInfo->pTransporter, pAppInfo);
rpcClose(pAppInfo->pTransporter);
}
static bool clientRpcRfp(int32_t code) {
......@@ -130,6 +130,21 @@ void closeAllRequests(SHashObj *pRequests) {
}
}
void destroyAppInst(SAppInstInfo* pAppInfo) {
tscDebug("destroy app inst mgr %p", pAppInfo);
hbRemoveAppHbMrg(&pAppInfo->pAppHbMgr);
taosHashRemove(appInfo.pInstMap, pAppInfo->instKey, strlen(pAppInfo->instKey));
taosMemoryFreeClear(pAppInfo->instKey);
closeTransporter(pAppInfo);
taosThreadMutexLock(&pAppInfo->qnodeMutex);
taosArrayDestroy(pAppInfo->pQnodeList);
taosThreadMutexUnlock(&pAppInfo->qnodeMutex);
taosMemoryFree(pAppInfo);
}
void destroyTscObj(void *pObj) {
STscObj *pTscObj = pObj;
......@@ -138,11 +153,12 @@ void destroyTscObj(void *pObj) {
int64_t connNum = atomic_sub_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
closeAllRequests(pTscObj->pRequests);
schedulerStopQueryHb(pTscObj->pAppInfo->pTransporter);
tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, remain inst totalConn:%" PRId64, pTscObj->id, pTscObj,
pTscObj->pAppInfo->numOfConns);
if (0 == connNum) {
closeTransporter(pTscObj);
destroyAppInst(pTscObj->pAppInfo);
}
tscDebug("connObj 0x%" PRIx64 " p:%p destroyed, totalConn:%" PRId64, pTscObj->id, pTscObj,
pTscObj->pAppInfo->numOfConns);
taosThreadMutexDestroy(&pTscObj->mutex);
taosMemoryFreeClear(pTscObj);
}
......@@ -174,6 +190,8 @@ void *createTscObj(const char *user, const char *auth, const char *db, int32_t c
pObj->id = taosAddRef(clientConnRefPool, pObj);
pObj->schemalessType = 1;
atomic_add_fetch_64(&pObj->pAppInfo->numOfConns, 1);
tscDebug("connObj created, 0x%" PRIx64 ",p:%p", pObj->id, pObj);
return pObj;
}
......
......@@ -790,22 +790,40 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
return pAppHbMgr;
}
void hbFreeAppHbMgr(SAppHbMgr *pTarget) {
void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
while (pIter != NULL) {
SClientHbReq *pOneReq = pIter;
tFreeClientHbReq(pOneReq);
pIter = taosHashIterate(pTarget->activeInfo, pIter);
}
taosHashCleanup(pTarget->activeInfo);
pTarget->activeInfo = NULL;
taosMemoryFree(pTarget->key);
taosMemoryFree(pTarget);
}
void hbRemoveAppHbMrg(SAppHbMgr **pAppHbMgr) {
taosThreadMutexLock(&clientHbMgr.lock);
int32_t mgrSize = taosArrayGetSize(clientHbMgr.appHbMgrs);
for (int32_t i = 0; i < mgrSize; ++i) {
SAppHbMgr *pItem = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (pItem == *pAppHbMgr) {
hbFreeAppHbMgr(*pAppHbMgr);
*pAppHbMgr = NULL;
taosArrayRemove(clientHbMgr.appHbMgrs, i);
break;
}
}
taosThreadMutexUnlock(&clientHbMgr.lock);
}
void appHbMgrCleanup(void) {
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
for (int i = 0; i < sz; i++) {
SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
void *pIter = taosHashIterate(pTarget->activeInfo, NULL);
while (pIter != NULL) {
SClientHbReq *pOneReq = pIter;
tFreeClientHbReq(pOneReq);
pIter = taosHashIterate(pTarget->activeInfo, pIter);
}
taosHashCleanup(pTarget->activeInfo);
pTarget->activeInfo = NULL;
taosMemoryFree(pTarget->key);
taosMemoryFree(pTarget);
hbFreeAppHbMgr(pTarget);
}
}
......
......@@ -122,7 +122,10 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
p->pAppHbMgr = appHbMgrInit(p, key);
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
p->instKey = key;
key = NULL;
tscDebug("new app inst mgr %p, user:%s, ip:%s, port:%d", p, user, ip, port);
pInst = &p;
}
......
......@@ -87,7 +87,6 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
// update the appInstInfo
pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
pTscObj->connType = connectRsp.connType;
......
......@@ -789,6 +789,8 @@ _return:
int32_t ctgCallUserCb(void* param) {
SCtgJob* pJob = (SCtgJob*)param;
//taosSsleep(2);
(*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode);
......
......@@ -368,6 +368,7 @@ void *closeThreadFp(void *arg) {
while (true) {
if (qParam->taos) {
usleep(rand() % 10000);
//usleep(1000000);
taos_close(qParam->taos);
break;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册