diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 140d2ef792a878ebe70f759c6863f536ad1f7b9f..11ba2a50d38a233985a272023ff8952c039cada1 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -16,7 +16,8 @@ #include "transComm.h" typedef struct SConnList { - queue conn; + queue conn; + int32_t size; } SConnList; typedef struct SCliConn { @@ -518,15 +519,18 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { if (QUEUE_IS_EMPTY(&plist->conn)) { return NULL; } + + plist->size -= 1; queue* h = QUEUE_HEAD(&plist->conn); SCliConn* conn = QUEUE_DATA(h, SCliConn, q); conn->status = ConnNormal; QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); - transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); - conn->task = NULL; - + if (conn->task != NULL) { + transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); + conn->task = NULL; + } return conn; } static void addConnToPool(void* pool, SCliConn* conn) { @@ -555,13 +559,17 @@ static void addConnToPool(void* pool, SCliConn* conn) { assert(conn->list != NULL); QUEUE_INIT(&conn->q); QUEUE_PUSH(&conn->list->conn, &conn->q); + conn->list->size += 1; + conn->task = NULL; assert(!QUEUE_IS_EMPTY(&conn->list->conn)); - STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); - arg->param1 = conn; - arg->param2 = thrd; - conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); + if (conn->list->size >= 10) { + STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); + arg->param1 = conn; + arg->param2 = thrd; + conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); + } } static int32_t allocConnRef(SCliConn* conn, bool update) { if (update) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 8b27d95e520d900539775d36f6aa9cd19c2300bd..14b8b35478ba366a7fa632836dc63dc187b77df7 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -75,7 +75,6 @@ typedef struct SWorkThrd { SAsyncPool* asyncPool; uv_prepare_t* prepare; queue msg; - TdThreadMutex msgMtx; queue conn; void* pTransInst; @@ -499,6 +498,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { tError("unexcept occurred, continue"); continue; } + // release handle to rpc init if (msg->type == Quit) { (*transAsyncHandle[msg->type])(msg, pThrd); @@ -743,7 +743,6 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { pThrd->pipe->data = pThrd; QUEUE_INIT(&pThrd->msg); - taosThreadMutexInit(&pThrd->msgMtx, NULL); pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); uv_prepare_init(pThrd->loop, pThrd->prepare); diff --git a/source/libs/transport/test/svrBench.c b/source/libs/transport/test/svrBench.c index 224f527385730dd8a7344d32270ce7883e3280c6..6eb80c8504d68b066d527393a7ba36175f737a23 100644 --- a/source/libs/transport/test/svrBench.c +++ b/source/libs/transport/test/svrBench.c @@ -75,15 +75,14 @@ void processShellMsg() { void *handle = pRpcMsg->info.handle; taosFreeQitem(pRpcMsg); - - { - SRpcMsg nRpcMsg = {0}; - nRpcMsg.pCont = rpcMallocCont(msgSize); - nRpcMsg.contLen = msgSize; - nRpcMsg.info.handle = handle; - nRpcMsg.code = TSDB_CODE_CTG_NOT_READY; - rpcSendResponse(&nRpcMsg); - } + //{ + // SRpcMsg nRpcMsg = {0}; + // nRpcMsg.pCont = rpcMallocCont(msgSize); + // nRpcMsg.contLen = msgSize; + // nRpcMsg.info.handle = handle; + // nRpcMsg.code = TSDB_CODE_CTG_NOT_READY; + // rpcSendResponse(&nRpcMsg); + //} } taosUpdateItemSize(qinfo.queue, numOfMsgs);