diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 06d5a7da915d53969a7008ee2cb8755d84a55642..99f890d3a0288b6da16c8047267986213a43e5fd 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -120,8 +120,8 @@ typedef struct { // SEpSet* pSet; // for synchronous API } SRpcReqContext; -typedef SRpcMsg STransMsg; -typedef SRpcInfo STrans; +typedef SRpcMsg STransMsg; +typedef SRpcInfo STrans; typedef SRpcConnInfo STransHandleInfo; typedef struct { @@ -139,7 +139,7 @@ typedef struct { int64_t rid; // refId returned by taosAddRef STransMsg* pRsp; // for synchronous API - tsem_t* pSem; // for synchronous API + tsem_t* pSem; // for synchronous API int hThrdIdx; char* ip; @@ -147,7 +147,6 @@ typedef struct { // SEpSet* pSet; // for synchronous API } STransConnCtx; - #pragma pack(push, 1) typedef struct { @@ -248,24 +247,21 @@ bool transReadComplete(SConnBuffer* connBuf); int transSetConnOption(uv_tcp_t* stream); - void transRefSrvHandle(void* handle); void transUnrefSrvHandle(void* handle); void transRefCliHandle(void* handle); void transUnrefCliHandle(void* handle); - -void transSendRequest(void *shandle, const char *ip, uint32_t port, STransMsg *pMsg); -void transSendRecv(void* shandle, const char *ip, uint32_t port, STransMsg *pMsg, STransMsg *pRsp); +void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg); +void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg, STransMsg* pRsp); void transSendResponse(const STransMsg* pMsg); -int transGetConnInfo(void *thandle, STransHandleInfo *pInfo); - +int transGetConnInfo(void* thandle, STransHandleInfo* pInfo); void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); -void transCloseClient(void *arg); -void transCloseServer(void *arg); +void transCloseClient(void* arg); +void transCloseServer(void* arg); #endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c55d699e4f83c7eee2c7470a47b2fc74b365c44a..4af02a982ef30e150cb8556ae969067df3d4dd6d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -42,7 +42,7 @@ typedef struct SCliConn { typedef struct SCliMsg { STransConnCtx* ctx; - STransMsg msg; + STransMsg msg; queue q; uint64_t st; } SCliMsg; @@ -122,7 +122,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ do { \ if (thrd->quit) { \ - cliHandleExcept(conn); \ + cliHandleExcept(conn); \ goto _RETURE; \ } \ } while (0) @@ -130,19 +130,25 @@ static void destroyThrdObj(SCliThrdObj* pThrd); #define CONN_HANDLE_BROKEN(conn) \ do { \ if (conn->broken) { \ - cliHandleExcept(conn); \ + cliHandleExcept(conn); \ goto _RETURE; \ } \ } while (0); -#define CONN_SET_PERSIST_BY_APP(conn) do { if (conn->persist == false) { conn->persist = true; transRefCliHandle(conn);}} while(0) +#define CONN_SET_PERSIST_BY_APP(conn) \ + do { \ + if (conn->persist == false) { \ + conn->persist = true; \ + transRefCliHandle(conn); \ + } \ + } while (0) #define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false) static void* cliWorkThread(void* arg); -void cliHandleResp(SCliConn* conn) { +void cliHandleResp(SCliConn* conn) { SCliThrdObj* pThrd = conn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; + STrans* pTransInst = pThrd->pTransInst; STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); pHead->code = htonl(pHead->code); @@ -156,16 +162,16 @@ void cliHandleResp(SCliConn* conn) { rpcMsg.pCont = transContFromHead((char*)pHead); rpcMsg.code = pHead->code; rpcMsg.msgType = pHead->msgType; - rpcMsg.ahandle = NULL; + rpcMsg.ahandle = NULL; - SCliMsg* pMsg = conn->data; - STransConnCtx *pCtx = pMsg ? pMsg->ctx : NULL; + SCliMsg* pMsg = conn->data; + STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { rpcMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, rpcMsg.msgType) : NULL; } else { - rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL; + rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL; } - //if (rpcMsg.ahandle == NULL) { + // if (rpcMsg.ahandle == NULL) { // tDebug("%s cli conn %p handle except", CONN_GET_INST_LABEL(conn), conn); // return; //} @@ -207,26 +213,26 @@ void cliHandleResp(SCliConn* conn) { void cliHandleExcept(SCliConn* pConn) { if (pConn->data == NULL) { - if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) { - transUnrefCliHandle(pConn); - return; - } + if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) { + transUnrefCliHandle(pConn); + return; + } } SCliThrdObj* pThrd = pConn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; + STrans* pTransInst = pThrd->pTransInst; SCliMsg* pMsg = pConn->data; - STransConnCtx *pCtx = pMsg ? pMsg->ctx : NULL; + STransConnCtx* pCtx = pMsg ? pMsg->ctx : NULL; STransMsg rpcMsg = {0}; rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; rpcMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; - rpcMsg.ahandle = NULL; + rpcMsg.ahandle = NULL; if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { rpcMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, rpcMsg.msgType) : NULL; } else { - rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL; + rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL; } if (pCtx == NULL || pCtx->pSem == NULL) { @@ -246,7 +252,7 @@ void cliHandleExcept(SCliConn* pConn) { void cliTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; - STrans* pRpc = pThrd->pTransInst; + STrans* pRpc = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; tTrace("%s, cli conn timeout, try to remove expire conn from conn pool", pRpc->label); @@ -420,7 +426,7 @@ void cliSend(SCliConn* pConn) { STransConnCtx* pCtx = pCliMsg->ctx; SCliThrdObj* pThrd = pConn->hostThrd; - STrans* pTransInst = pThrd->pTransInst; + STrans* pTransInst = pThrd->pTransInst; STransMsg* pMsg = (STransMsg*)(&pCliMsg->msg); @@ -513,7 +519,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { tTrace("%s cli msg tran time cost: %" PRIu64 "us", ((STrans*)pThrd->pTransInst)->label, el); STransConnCtx* pCtx = pMsg->ctx; - STrans* pTransInst = pThrd->pTransInst; + STrans* pTransInst = pThrd->pTransInst; SCliConn* conn = cliGetConn(pMsg, pThrd); if (conn != NULL) { @@ -534,7 +540,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); } - + conn->hThrdIdx = pCtx->hThrdIdx; } static void cliAsyncCb(uv_async_t* handle) { @@ -543,7 +549,7 @@ static void cliAsyncCb(uv_async_t* handle) { SCliMsg* pMsg = NULL; // batch process to avoid to lock/unlock frequently - queue wq; + queue wq; pthread_mutex_lock(&item->mtx); QUEUE_MOVE(&item->qmsg, &wq); pthread_mutex_unlock(&item->mtx); @@ -689,9 +695,9 @@ void transUnrefCliHandle(void* handle) { } } -void transSendRequest(void *shandle, const char *ip, uint32_t port, STransMsg *pMsg) { +void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* pMsg) { STrans* pTransInst = (STrans*)shandle; - int index = CONN_HOST_THREAD_INDEX((SCliConn *)pMsg->handle); + int index = CONN_HOST_THREAD_INDEX((SCliConn*)pMsg->handle); if (index == -1) { index = cliRBChoseIdx(pTransInst); } @@ -718,9 +724,9 @@ void transSendRequest(void *shandle, const char *ip, uint32_t port, STransMsg *p SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[index]; transSendAsync(thrd->asyncPool, &(cliMsg->q)); } -void transSendRecv(void* shandle, const char *ip, uint32_t port, STransMsg *pReq, STransMsg *pRsp) { +void transSendRecv(void* shandle, const char* ip, uint32_t port, STransMsg* pReq, STransMsg* pRsp) { STrans* pTransInst = (STrans*)shandle; - int index = CONN_HOST_THREAD_INDEX(pReq->handle); + int index = CONN_HOST_THREAD_INDEX(pReq->handle); if (index == -1) { index = cliRBChoseIdx(pTransInst); } diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index f9c014902c406a394f7af96239cc83bbf099907d..c236a69f4ee0663112797d34f6623776365abd70 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -48,7 +48,7 @@ typedef struct SSrvConn { typedef struct SSrvMsg { SSrvConn* pConn; - STransMsg msg; + STransMsg msg; queue q; } SSrvMsg; @@ -317,8 +317,8 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { // impl later; tTrace("server conn %p prepare to send resp", smsg->pConn); - SSrvConn* pConn = smsg->pConn; - STransMsg* pMsg = &smsg->msg; + SSrvConn* pConn = smsg->pConn; + STransMsg* pMsg = &smsg->msg; if (pMsg->pCont == 0) { pMsg->pCont = (void*)rpcMallocCont(0); pMsg->contLen = 0; @@ -798,8 +798,8 @@ void transSendResponse(const STransMsg* pMsg) { tTrace("server conn %p start to send resp", pConn); transSendAsync(pThrd->asyncPool, &srvMsg->q); } -int transGetConnInfo(void *thandle, STransHandleInfo *pInfo) { - SSrvConn* pConn = thandle; +int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) { + SSrvConn* pConn = thandle; struct sockaddr_in addr = pConn->addr; pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);