From 3bf28e189451ac21afc95cbd455a20e42fb3884c Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 25 Feb 2023 15:45:14 +0800 Subject: [PATCH] fix: limit session num --- source/libs/transport/src/transCli.c | 55 ++++++++++++++-------------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 182d2b59b8..08bb43aa90 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -23,7 +23,6 @@ typedef struct SConnList { queue conns; int32_t size; SMsgList* list; - void* pThrd; } SConnList; typedef struct { @@ -142,7 +141,7 @@ typedef struct { // conn pool // add expire timeout and capacity limit static void* createConnPool(int size); -static void* destroyConnPool(void* pool); +static void* destroyConnPool(SCliThrd* thread); static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed); static void addConnToPool(void* pool, SCliConn* conn); static void doCloseIdleConn(void* param); @@ -547,9 +546,9 @@ void* createConnPool(int size) { // thread local, no lock return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); } -void* destroyConnPool(void* pool) { +void* destroyConnPool(SCliThrd* pThrd) { + void* pool = pThrd->pool; SConnList* connList = taosHashIterate((SHashObj*)pool, NULL); - SCliThrd* pThrd = connList->pThrd; while (connList != NULL) { while (!QUEUE_IS_EMPTY(&connList->conns)) { queue* h = QUEUE_HEAD(&connList->conns); @@ -582,29 +581,31 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); STrans* pTranInst = pThrd->pTransInst; if (plist == NULL) { + SConnList list = {0}; + taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); + plist = taosHashGet(pool, key, strlen(key)); + SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); QUEUE_INIT(&nList->msgQ); nList->numOfConn++; - SConnList list = {0}; - QUEUE_INIT(&list.conns); - list.list = nList; - - taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); - return NULL; + QUEUE_INIT(&plist->conns); + plist->list = nList; } - SMsgList* msglist = plist->list; - if (QUEUE_IS_EMPTY(&plist->conns) && msglist->numOfConn >= pTranInst->connLimitNum) { - *exceed = true; + if (QUEUE_IS_EMPTY(&plist->conns)) { + if (plist->list->numOfConn >= pTranInst->connLimitNum) { + *exceed = true; + } return NULL; } + queue* h = QUEUE_HEAD(&plist->conns); + QUEUE_REMOVE(h); plist->size -= 1; - queue* h = QUEUE_HEAD(&plist->conns); + SCliConn* conn = QUEUE_DATA(h, SCliConn, q); conn->status = ConnNormal; - QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); if (conn->task != NULL) { @@ -619,23 +620,21 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { STrans* pTransInst = pThrd->pTransInst; SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); if (plist == NULL) { + SConnList list = {0}; + taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); + plist = taosHashGet(pool, key, strlen(key)); + SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); QUEUE_INIT(&nList->msgQ); nList->numOfConn++; - SConnList list = {0}; - QUEUE_INIT(&list.conns); - list.list = nList; - - taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); - plist = taosHashGet((SHashObj*)pool, key, strlen(key)); - - return NULL; + QUEUE_INIT(&plist->conns); + plist->list = nList; } - SMsgList* list = plist->list; // no avaliable conn in pool if (QUEUE_IS_EMPTY(&plist->conns)) { + SMsgList* list = plist->list; if ((list)->numOfConn >= pTransInst->connLimitNum) { STraceId* trace = &(*pMsg)->msg.info.traceId; @@ -669,11 +668,12 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { return NULL; } + queue* h = QUEUE_HEAD(&plist->conns); plist->size -= 1; - queue* h = QUEUE_HEAD(&plist->conns); + QUEUE_REMOVE(h); + SCliConn* conn = QUEUE_DATA(h, SCliConn, q); conn->status = ConnNormal; - QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); if (conn->task != NULL) { @@ -686,6 +686,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { if (conn->status == ConnInPool) { return; } + tError("add conn to pool"); allocConnRef(conn, true); SCliThrd* thrd = conn->hostThrd; @@ -1347,7 +1348,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { tDebug("cli work thread %p start to quit", pThrd); destroyCmsg(pMsg); - destroyConnPool(pThrd->pool); + destroyConnPool(pThrd); uv_walk(pThrd->loop, cliWalkCb, NULL); } static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { -- GitLab