diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 963a85922f340d86bea58b30a2dd5cabba17d8b5..55db0b129a1cfb7f35984b8720fb7d5b35311e70 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -229,7 +229,7 @@ typedef struct { SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); void transDestroyAsyncPool(SAsyncPool* pool); -int transSendAsync(SAsyncPool* pool, queue* mq); +int transAsyncSend(SAsyncPool* pool, queue* mq); #define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc) \ do { \ diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index ab08ce82a88677a33aa4e0c49b19ee0e71f46fc3..aba2e6957b51c62765fc155333e060e7804bd3de 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -967,7 +967,7 @@ void cliSendQuit(SCliThrd* thrd) { // cli can stop gracefully SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg)); msg->type = Quit; - transSendAsync(thrd->asyncPool, &msg->q); + transAsyncSend(thrd->asyncPool, &msg->q); } void cliWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { @@ -1138,7 +1138,7 @@ void transReleaseCliHandle(void* handle) { cmsg->msg = tmsg; cmsg->type = Release; - transSendAsync(pThrd->asyncPool, &cmsg->q); + transAsyncSend(pThrd->asyncPool, &cmsg->q); return; } @@ -1171,7 +1171,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra STraceId* trace = &pReq->info.traceId; tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - ASSERT(transSendAsync(pThrd->asyncPool, &(cliMsg->q)) == 0); + ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0); return; } @@ -1205,7 +1205,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - transSendAsync(pThrd->asyncPool, &(cliMsg->q)); + transAsyncSend(pThrd->asyncPool, &(cliMsg->q)); tsem_wait(sem); tsem_destroy(sem); taosMemoryFree(sem); @@ -1234,7 +1234,7 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; tDebug("%s update epset at thread:%08" PRId64 "", pTransInst->label, thrd->pid); - transSendAsync(thrd->asyncPool, &(cliMsg->q)); + transAsyncSend(thrd->asyncPool, &(cliMsg->q)); } } #endif diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index bff7d79bd38fd6cd8afabf6b005e17477090bcf8..fbe0951a4655148301d4b7351fd1010ac400bd45 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -202,7 +202,7 @@ void transDestroyAsyncPool(SAsyncPool* pool) { taosMemoryFree(pool->asyncs); taosMemoryFree(pool); } -int transSendAsync(SAsyncPool* pool, queue* q) { +int transAsyncSend(SAsyncPool* pool, queue* q) { int idx = pool->index; idx = idx % pool->nAsync; // no need mutex here diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 892d32696eba25a909b60ba0f5a37368c2b7186c..08363b3c7c81f0c90e61841baf1b1cfe729b103a 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -999,7 +999,7 @@ void sendQuitToWorkThrd(SWorkThrd* pThrd) { SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); msg->type = Quit; tDebug("server send quit msg to work thread"); - transSendAsync(pThrd->asyncPool, &msg->q); + transAsyncSend(pThrd->asyncPool, &msg->q); } void transCloseServer(void* arg) { @@ -1070,7 +1070,7 @@ void transReleaseSrvHandle(void* handle) { m->type = Release; tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); - transSendAsync(pThrd->asyncPool, &m->q); + transAsyncSend(pThrd->asyncPool, &m->q); transReleaseExHandle(refMgt, refId); return; _return1: @@ -1099,7 +1099,7 @@ void transSendResponse(const STransMsg* msg) { STraceId* trace = (STraceId*)&msg->info.traceId; tGTrace("conn %p start to send resp (1/2)", exh->handle); - transSendAsync(pThrd->asyncPool, &m->q); + transAsyncSend(pThrd->asyncPool, &m->q); transReleaseExHandle(refMgt, refId); return; _return1: @@ -1128,7 +1128,7 @@ void transRegisterMsg(const STransMsg* msg) { m->type = Register; tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle); - transSendAsync(pThrd->asyncPool, &m->q); + transAsyncSend(pThrd->asyncPool, &m->q); transReleaseExHandle(refMgt, refId); return;