diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index b5b99e92b07b155ab69a6d41f3d793ed697dab7f..09f614ddb25700bbfc9d9d2ab521e783f427d965 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1460,6 +1460,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { tscError("failed to sched msg to tsc, tsc ready to quit"); rpcFreeCont(pMsg->pCont); taosMemoryFree(arg->pEpset); + destroySendMsgInfo(pMsg->info.ahandle); taosMemoryFree(arg); } } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1a99db5f992d51f5e01ba5120e2099f1d53a307f..954303e3c14fa4299102473598d9453e98107070 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1275,7 +1275,11 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < cli->numOfThreads; i++) { SCliThrd* pThrd = createThrdObj(shandle); - int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd)); + if (pThrd == NULL) { + return NULL; + } + + int err = taosThreadCreate(&pThrd->thread, NULL, cliWorkThread, (void*)(pThrd)); if (err == 0) { tDebug("success to create tranport-cli thread:%d", i); } @@ -1332,9 +1336,22 @@ static SCliThrd* createThrdObj(void* trans) { taosThreadMutexInit(&pThrd->msgMtx, NULL); pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); - uv_loop_init(pThrd->loop); - + int err = uv_loop_init(pThrd->loop); + if (err != 0) { + tError("failed to init uv_loop, reason:%s", uv_err_name(err)); + taosMemoryFree(pThrd->loop); + taosThreadMutexDestroy(&pThrd->msgMtx); + taosMemoryFree(pThrd); + return NULL; + } pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 8, pThrd, cliAsyncCb); + if (pThrd->asyncPool == NULL) { + uv_loop_close(pThrd->loop); + taosMemoryFree(pThrd->loop); + taosThreadMutexDestroy(&pThrd->msgMtx); + taosMemoryFree(pThrd); + return NULL; + } pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); uv_prepare_init(pThrd->loop, pThrd->prepare); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 1161ed7c00408a9559146ba532c74055813f5c5d..8c9e8f5a6009136ca8eb6b4f647cc11ea3bbf048 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -214,24 +214,37 @@ SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) pool->nAsync = sz; pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync); - for (int i = 0; i < pool->nAsync; i++) { + int i = 0, err = 0; + for (i = 0; i < pool->nAsync; i++) { + uv_async_t* async = &(pool->asyncs[i]); + SAsyncItem* item = taosMemoryCalloc(1, sizeof(SAsyncItem)); item->pThrd = arg; QUEUE_INIT(&item->qmsg); taosThreadMutexInit(&item->mtx, NULL); - uv_async_t* async = &(pool->asyncs[i]); - uv_async_init(loop, async, cb); async->data = item; + err = uv_async_init(loop, async, cb); + if (err != 0) { + tError("failed to init async, reason:%s", uv_err_name(err)); + break; + } } + + if (i != pool->nAsync) { + transAsyncPoolDestroy(pool); + pool = NULL; + } + return pool; } void transAsyncPoolDestroy(SAsyncPool* pool) { for (int i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); - SAsyncItem* item = async->data; + if (item == NULL) continue; + taosThreadMutexDestroy(&item->mtx); taosMemoryFree(item); }