提交 0c534273 编写于 作者: dengyihao's avatar dengyihao

feat: refactor rpc code

上级 ddf0ff11
...@@ -229,7 +229,7 @@ typedef struct { ...@@ -229,7 +229,7 @@ typedef struct {
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb);
void transDestroyAsyncPool(SAsyncPool* pool); void transDestroyAsyncPool(SAsyncPool* pool);
int transSendAsync(SAsyncPool* pool, queue* mq); int transAsyncSend(SAsyncPool* pool, queue* mq);
#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc) \ #define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc) \
do { \ do { \
......
...@@ -967,7 +967,7 @@ void cliSendQuit(SCliThrd* thrd) { ...@@ -967,7 +967,7 @@ void cliSendQuit(SCliThrd* thrd) {
// cli can stop gracefully // cli can stop gracefully
SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg)); SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
msg->type = Quit; msg->type = Quit;
transSendAsync(thrd->asyncPool, &msg->q); transAsyncSend(thrd->asyncPool, &msg->q);
} }
void cliWalkCb(uv_handle_t* handle, void* arg) { void cliWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) { if (!uv_is_closing(handle)) {
...@@ -1138,7 +1138,7 @@ void transReleaseCliHandle(void* handle) { ...@@ -1138,7 +1138,7 @@ void transReleaseCliHandle(void* handle) {
cmsg->msg = tmsg; cmsg->msg = tmsg;
cmsg->type = Release; cmsg->type = Release;
transSendAsync(pThrd->asyncPool, &cmsg->q); transAsyncSend(pThrd->asyncPool, &cmsg->q);
return; return;
} }
...@@ -1171,7 +1171,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra ...@@ -1171,7 +1171,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
STraceId* trace = &pReq->info.traceId; STraceId* trace = &pReq->info.traceId;
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid, tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
ASSERT(transSendAsync(pThrd->asyncPool, &(cliMsg->q)) == 0); ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0);
return; return;
} }
...@@ -1205,7 +1205,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM ...@@ -1205,7 +1205,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid, tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
transSendAsync(pThrd->asyncPool, &(cliMsg->q)); transAsyncSend(pThrd->asyncPool, &(cliMsg->q));
tsem_wait(sem); tsem_wait(sem);
tsem_destroy(sem); tsem_destroy(sem);
taosMemoryFree(sem); taosMemoryFree(sem);
...@@ -1234,7 +1234,7 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { ...@@ -1234,7 +1234,7 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
tDebug("%s update epset at thread:%08" PRId64 "", pTransInst->label, thrd->pid); tDebug("%s update epset at thread:%08" PRId64 "", pTransInst->label, thrd->pid);
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transAsyncSend(thrd->asyncPool, &(cliMsg->q));
} }
} }
#endif #endif
...@@ -202,7 +202,7 @@ void transDestroyAsyncPool(SAsyncPool* pool) { ...@@ -202,7 +202,7 @@ void transDestroyAsyncPool(SAsyncPool* pool) {
taosMemoryFree(pool->asyncs); taosMemoryFree(pool->asyncs);
taosMemoryFree(pool); taosMemoryFree(pool);
} }
int transSendAsync(SAsyncPool* pool, queue* q) { int transAsyncSend(SAsyncPool* pool, queue* q) {
int idx = pool->index; int idx = pool->index;
idx = idx % pool->nAsync; idx = idx % pool->nAsync;
// no need mutex here // no need mutex here
......
...@@ -999,7 +999,7 @@ void sendQuitToWorkThrd(SWorkThrd* pThrd) { ...@@ -999,7 +999,7 @@ void sendQuitToWorkThrd(SWorkThrd* pThrd) {
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
msg->type = Quit; msg->type = Quit;
tDebug("server send quit msg to work thread"); tDebug("server send quit msg to work thread");
transSendAsync(pThrd->asyncPool, &msg->q); transAsyncSend(pThrd->asyncPool, &msg->q);
} }
void transCloseServer(void* arg) { void transCloseServer(void* arg) {
...@@ -1070,7 +1070,7 @@ void transReleaseSrvHandle(void* handle) { ...@@ -1070,7 +1070,7 @@ void transReleaseSrvHandle(void* handle) {
m->type = Release; m->type = Release;
tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
transSendAsync(pThrd->asyncPool, &m->q); transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refMgt, refId);
return; return;
_return1: _return1:
...@@ -1099,7 +1099,7 @@ void transSendResponse(const STransMsg* msg) { ...@@ -1099,7 +1099,7 @@ void transSendResponse(const STransMsg* msg) {
STraceId* trace = (STraceId*)&msg->info.traceId; STraceId* trace = (STraceId*)&msg->info.traceId;
tGTrace("conn %p start to send resp (1/2)", exh->handle); tGTrace("conn %p start to send resp (1/2)", exh->handle);
transSendAsync(pThrd->asyncPool, &m->q); transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refMgt, refId);
return; return;
_return1: _return1:
...@@ -1128,7 +1128,7 @@ void transRegisterMsg(const STransMsg* msg) { ...@@ -1128,7 +1128,7 @@ void transRegisterMsg(const STransMsg* msg) {
m->type = Register; m->type = Register;
tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle); tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle);
transSendAsync(pThrd->asyncPool, &m->q); transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refMgt, refId);
return; return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册