From 963a72eed3fb683b5079f103b9643cf8a4d082ba Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 21 Jun 2022 18:31:50 +0800 Subject: [PATCH] refactor code --- source/dnode/mgmt/node_util/inc/dmUtil.h | 2 +- source/dnode/mnode/impl/inc/mndInt.h | 4 +-- source/dnode/vnode/src/inc/vnd.h | 8 ++++- source/libs/transport/inc/transComm.h | 2 ++ source/libs/transport/inc/transLog.h | 8 +++-- source/libs/transport/src/transCli.c | 17 +++++---- source/libs/transport/src/transSvr.c | 45 ++++++++++++------------ 7 files changed, 51 insertions(+), 35 deletions(-) diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 7897f62f62..c3238d83b3 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -51,7 +51,7 @@ extern "C" { #define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }} #define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} -#define dGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ",GTID: %s", __VA_ARGS__, buf);} while(0) +#define dGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ",gtid:%s", __VA_ARGS__, buf);} while(0) typedef enum { DNODE = 0, diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index c810a0cbc7..5d508be237 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -40,7 +40,7 @@ extern "C" { #define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }} #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} -#define mGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", GTID: %s", __VA_ARGS__, buf);} while(0) +#define mGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", gtid:%s", __VA_ARGS__, buf);} while(0) // clang-format on @@ -70,7 +70,7 @@ typedef struct { typedef struct { SCacheObj *connCache; - SCacheObj *appCache; + SCacheObj *appCache; } SProfileMgmt; typedef struct { diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 9339402d43..9b1c2e456d 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -32,7 +32,13 @@ extern "C" { #define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) #define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", DEBUG_DEBUG, vDebugFlag, __VA_ARGS__); }} while(0) #define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0) -#define vGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vTrace(param " GTID: %s", __VA_ARGS__, buf);} while(0)//#define vDye(...) do + +#define vGTrace(param, ...) do { if (vDebugFlag & DEBUG_TRACE) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vTrace(param ", gtid:%s", __VA_ARGS__, buf);}} while(0) +#define vGFatal(param, ...) do { if (vDebugFlag & DEBUG_FATAL) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vFatal(param ", gtid:%s", __VA_ARGS__, buf);}} while(0) +#define vGError(param, ...) do { if (vDebugFlag & DEBUG_ERROR) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vError(param ", gtid:%s", __VA_ARGS__, buf);}} while(0) +#define vGWarn(param, ...) do { if (vDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vWarn(param ", gtid:%s", __VA_ARGS__, buf);}} while(0) +#define vGInfo(param, ...) do { if (vDebugFlag & DEBUG_INFO) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vInfo(param ", gtid:%s", __VA_ARGS__, buf);}} while(0) +#define vGDebug(param, ...) do { if (vDebugFlag & DEBUG_DEBUG) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vDebug(param ", gtid:%s", __VA_ARGS__, buf);}} while(0) // clang-format on // vnodeCfg.c diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 146b127422..327fe50814 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -183,6 +183,8 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co #define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead)); #define transIsReq(type) (type & 1U) +#define transLabel(trans) ((STrans*)trans)->label + // int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); // void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); //// int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); diff --git a/source/libs/transport/inc/transLog.h b/source/libs/transport/inc/transLog.h index 9947ba803f..9d5c80d8e6 100644 --- a/source/libs/transport/inc/transLog.h +++ b/source/libs/transport/inc/transLog.h @@ -32,8 +32,12 @@ extern "C" { #define tTrace(...) do {if (rpcDebugFlag & DEBUG_TRACE){ taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); }} while(0) #define tDump(x, y) do {if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); } } while(0) -//#define tTR(param, ...) do { char buf[40] = {0};TRACE_TO_STR(trace, buf);tTrace("TRID: %s "param, buf, __VA_ARGS__);} while(0) -#define tGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tTrace(param ", GTID: %s", __VA_ARGS__, buf);} while(0) +#define tGTrace(param, ...) do { if (rpcDebugFlag & DEBUG_TRACE){char buf[40] = {0}; TRACE_TO_STR(trace, buf); tTrace(param ", gtid:%s", __VA_ARGS__, buf);}} while(0) +#define tGFatal(param, ...) do {if (rpcDebugFlag & DEBUG_FATAL){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); tFatal(param ", gtid:%s", __VA_ARGS__, buf); }} while (0) +#define tGError(param, ...) do { if (rpcDebugFlag & DEBUG_ERROR){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); tError(param ", gtid:%s", __VA_ARGS__, buf);} } while(0) +#define tGWarn(param, ...) do { if (rpcDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tWarn(param ", gtid:%s", __VA_ARGS__, buf); }} while(0) +#define tGInfo(param, ...) do { if (rpcDebugFlag & DEBUG_INFO) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tInfo(param ", gtid:%s", __VA_ARGS__, buf); }} while(0) +#define tGDebug(param,...) do {if (rpcDebugFlag & DEBUG_DEBUG){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); tDebug(param ", gtid:%s", __VA_ARGS__, buf); }} while(0) // clang-format on #ifdef __cplusplus diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 912c8b59c2..852ffc9a0e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -477,9 +477,10 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { return NULL; } queue* h = QUEUE_HEAD(&plist->conn); - QUEUE_REMOVE(h); + // //QUEUE_REMOVE(h); SCliConn* conn = QUEUE_DATA(h, SCliConn, conn); - conn->status = ConnNormal; + // conn->status = ConnNormal; + QUEUE_REMOVE(&conn->conn); QUEUE_INIT(&conn->conn); return conn; } @@ -487,7 +488,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { SCliThrdObj* thrd = conn->hostThrd; CONN_HANDLE_THREAD_QUIT(thrd); - STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst; + STrans* pTransInst = thrd->pTransInst; conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); transQueueClear(&conn->cliMsgs); transCtxCleanup(&conn->ctx); @@ -500,6 +501,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); // list already create before assert(plist != NULL); + QUEUE_INIT(&conn->conn); QUEUE_PUSH(&plist->conn, &conn->conn); assert(!QUEUE_IS_EMPTY(&plist->conn)); } @@ -563,6 +565,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); QUEUE_REMOVE(&conn->conn); + QUEUE_INIT(&conn->conn); if (clear) { uv_close((uv_handle_t*)conn->stream, cliDestroy); } @@ -778,11 +781,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { int ret = transSetConnOption((uv_tcp_t*)conn->stream); if (ret) { - tError("%s conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret)); + tError("%s conn %p failed to set conn option, errmsg %s", transLabel(pTransInst), conn, uv_err_name(ret)); } int fd = taosCreateSocketWithTimeOutOpt(TRANS_CONN_TIMEOUT); if (fd == -1) { - tTrace("%s conn %p failed to create socket", pTransInst->label, conn); + tTrace("%s conn %p failed to create socket", transLabel(pTransInst), conn); cliHandleExcept(conn); return; } @@ -1110,7 +1113,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx]; STraceId* trace = &pReq->info.traceId; - tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", pTransInst->label, thrd->pid, + tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), thrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0); } @@ -1143,7 +1146,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx]; STraceId* trace = &pReq->info.traceId; - tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", pTransInst->label, thrd->pid, + tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), thrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); transSendAsync(thrd->asyncPool, &(cliMsg->q)); diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index f9558d7252..593a790a21 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -284,12 +284,12 @@ static void uvHandleReq(SSvrConn* pConn) { if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); - tGTrace("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), - taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), - ntohs(pConn->localAddr.sin_port), transMsg.contLen); - } else { - tGTrace("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d", pConn, + tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", transLabel(pConn), pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen); + } else { + tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d", transLabel(pConn), + pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp, transMsg.code); // no ref here @@ -304,7 +304,8 @@ static void uvHandleReq(SSvrConn* pConn) { transMsg.info.refId = pConn->refId; transMsg.info.traceId = pHead->traceId; - tGTrace("handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.info.handle, pConn, pConn->refId); + tGTrace("%s handle %p conn: %p translated to app, refId: %" PRIu64 "", transLabel(pConn), transMsg.info.handle, pConn, + pConn->refId); assert(transMsg.info.handle != NULL); if (pHead->noResp == 1) { @@ -330,12 +331,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { pBuf->len += nread; - tTrace("conn %p total read: %d, current read: %d", conn, pBuf->len, (int)nread); + tTrace("%s conn %p total read: %d, current read: %d", transLabel(conn->pTransInst), conn, pBuf->len, (int)nread); if (transReadComplete(pBuf)) { - tTrace("conn %p alread read complete packet", conn); + tTrace("%s conn %p alread read complete packet", transLabel(conn->pTransInst), conn); uvHandleReq(conn); } else { - tTrace("conn %p read partial packet, continue to read", conn); + tTrace("%s conn %p read partial packet, continue to read", transLabel(conn->pTransInst), conn); } return; } @@ -343,12 +344,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { return; } - tError("conn %p read error: %s", conn, uv_err_name(nread)); + tError("%s conn %p read error: %s", transLabel(conn->pTransInst), conn, uv_err_name(nread)); if (nread < 0) { conn->broken = true; if (conn->status == ConnAcquire) { if (conn->regArg.init) { - tTrace("conn %p broken, notify server app", conn); + tTrace("%s conn %p broken, notify server app", transLabel(conn->pTransInst), conn); STrans* pTransInst = conn->pTransInst; (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); memset(&conn->regArg, 0, sizeof(conn->regArg)); @@ -457,9 +458,9 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { int32_t len = transMsgLenFromCont(pMsg->contLen); STraceId* trace = &pMsg->info.traceId; - tGTrace("conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", pConn, TMSG_INFO(pHead->msgType), - taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), - ntohs(pConn->localAddr.sin_port), len); + tGTrace("%s conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", transLabel(pConn->pTransInst), pConn, + TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), len); pHead->msgLen = htonl(len); wb->base = msg; @@ -737,7 +738,7 @@ static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName) { // conn set QUEUE_INIT(&pThrd->conn); - pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, uvWorkerAsyncCb); + pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 1, pThrd, uvWorkerAsyncCb); uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); // uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); return true; @@ -801,7 +802,7 @@ static SSvrConn* createConn(void* hThrd) { pConn->refId = exh->refId; transRefSrvHandle(pConn); - tTrace("handle %p, conn %p created, refId: %" PRId64 "", exh, pConn, pConn->refId); + tTrace("%s handle %p, conn %p created, refId: %" PRId64 "", transLabel(pThrd->pTransInst), exh, pConn, pConn->refId); return pConn; } @@ -848,7 +849,7 @@ static void uvDestroyConn(uv_handle_t* handle) { transReleaseExHandle(refMgt, conn->refId); transRemoveExHandle(refMgt, conn->refId); - tDebug("conn %p destroy", conn); + tDebug("%s conn %p destroy", transLabel(thrd->pTransInst), conn); // uv_timer_stop(&conn->pTimer); transQueueDestroy(&conn->srvMsgs); @@ -977,18 +978,18 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd) { uvStartSendRespInternal(msg); return; } else if (conn->status == ConnRelease || conn->status == ConnNormal) { - tDebug("conn %p already released, ignore release-msg", conn); + tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pTransInst), conn); } destroySmsg(msg); } void uvHandleResp(SSvrMsg* msg, SWorkThrdObj* thrd) { // send msg to client - tDebug("conn %p start to send resp (2/2)", msg->pConn); + tDebug("%s conn %p start to send resp (2/2)", transLabel(thrd->pTransInst), msg->pConn); uvStartSendResp(msg); } void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd) { SSvrConn* conn = msg->pConn; - tDebug("conn %p register brokenlink callback", conn); + tDebug("%s conn %p register brokenlink callback", transLabel(thrd->pTransInst), conn); if (conn->status == ConnAcquire) { if (!transQueuePush(&conn->srvMsgs, msg)) { return; @@ -1094,7 +1095,7 @@ void transReleaseSrvHandle(void* handle) { m->msg = tmsg; m->type = Release; - tTrace("conn %p start to release", exh->handle); + tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); transSendAsync(pThrd->asyncPool, &m->q); transReleaseExHandle(refMgt, refId); return; @@ -1152,7 +1153,7 @@ void transRegisterMsg(const STransMsg* msg) { m->msg = tmsg; m->type = Register; - tTrace("conn %p start to register brokenlink callback", exh->handle); + tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle); transSendAsync(pThrd->asyncPool, &m->q); transReleaseExHandle(refMgt, refId); return; -- GitLab