diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 9eb51359690ab68e549ad7c1bcaad9c0ff62115d..843798817d57f152455a37df004e42968a76d42f 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -226,6 +226,7 @@ typedef struct { int index; int nAsync; uv_async_t* asyncs; + int8_t stop; } SAsyncPool; SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 32d1e7140e5346231f525069009b252236a7a0c2..557ed548f49b136d61f37b2d00381dcbc1ff5e04 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1020,6 +1020,7 @@ void cliSendQuit(SCliThrd* thrd) { SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg)); msg->type = Quit; transAsyncSend(thrd->asyncPool, &msg->q); + atomic_store_8(&thrd->asyncPool->stop, 1); } void cliWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { @@ -1238,7 +1239,9 @@ int transReleaseCliHandle(void* handle) { cmsg->msg = tmsg; cmsg->type = Release; - transAsyncSend(pThrd->asyncPool, &cmsg->q); + if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) { + return -1; + } return 0; } @@ -1279,7 +1282,10 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran STraceId* trace = &pReq->info.traceId; tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0); + if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) { + destroyCmsg(cliMsg); + return -1; + } transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return 0; } @@ -1323,7 +1329,10 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - transAsyncSend(pThrd->asyncPool, &(cliMsg->q)); + if (0 != transAsyncSend(pThrd->asyncPool, &cliMsg->q)) { + destroyCmsg(cliMsg); + return -1; + } tsem_wait(sem); tsem_destroy(sem); taosMemoryFree(sem); @@ -1358,7 +1367,10 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid); - transAsyncSend(thrd->asyncPool, &(cliMsg->q)); + if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) { + destroyCmsg(cliMsg); + return -1; + } } transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); return 0; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 7c76f69f0cc147682e5c451b1459aa99086af652..c89bbd408bf1010eed8128e28aabdd55616d1784 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -177,7 +177,6 @@ int transSetConnOption(uv_tcp_t* stream) { SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool)); - pool->index = 0; pool->nAsync = sz; pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync); @@ -207,6 +206,9 @@ void transDestroyAsyncPool(SAsyncPool* pool) { taosMemoryFree(pool); } int transAsyncSend(SAsyncPool* pool, queue* q) { + if (atomic_load_8(&pool->stop) == 1) { + return -1; + } int idx = pool->index; idx = idx % pool->nAsync; // no need mutex here