提交 963a72ee 编写于 作者: dengyihao's avatar dengyihao

refactor code

上级 4cacb51e
...@@ -51,7 +51,7 @@ extern "C" { ...@@ -51,7 +51,7 @@ extern "C" {
#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }} #define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} #define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
#define dGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ",GTID: %s", __VA_ARGS__, buf);} while(0) #define dGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ",gtid:%s", __VA_ARGS__, buf);} while(0)
typedef enum { typedef enum {
DNODE = 0, DNODE = 0,
......
...@@ -40,7 +40,7 @@ extern "C" { ...@@ -40,7 +40,7 @@ extern "C" {
#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }} #define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }} #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
#define mGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", GTID: %s", __VA_ARGS__, buf);} while(0) #define mGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", gtid:%s", __VA_ARGS__, buf);} while(0)
// clang-format on // clang-format on
...@@ -70,7 +70,7 @@ typedef struct { ...@@ -70,7 +70,7 @@ typedef struct {
typedef struct { typedef struct {
SCacheObj *connCache; SCacheObj *connCache;
SCacheObj *appCache; SCacheObj *appCache;
} SProfileMgmt; } SProfileMgmt;
typedef struct { typedef struct {
......
...@@ -32,7 +32,13 @@ extern "C" { ...@@ -32,7 +32,13 @@ extern "C" {
#define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) #define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", DEBUG_DEBUG, vDebugFlag, __VA_ARGS__); }} while(0) #define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", DEBUG_DEBUG, vDebugFlag, __VA_ARGS__); }} while(0)
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0) #define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
#define vGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vTrace(param " GTID: %s", __VA_ARGS__, buf);} while(0)//#define vDye(...) do
#define vGTrace(param, ...) do { if (vDebugFlag & DEBUG_TRACE) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vTrace(param ", gtid:%s", __VA_ARGS__, buf);}} while(0)
#define vGFatal(param, ...) do { if (vDebugFlag & DEBUG_FATAL) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vFatal(param ", gtid:%s", __VA_ARGS__, buf);}} while(0)
#define vGError(param, ...) do { if (vDebugFlag & DEBUG_ERROR) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vError(param ", gtid:%s", __VA_ARGS__, buf);}} while(0)
#define vGWarn(param, ...) do { if (vDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vWarn(param ", gtid:%s", __VA_ARGS__, buf);}} while(0)
#define vGInfo(param, ...) do { if (vDebugFlag & DEBUG_INFO) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vInfo(param ", gtid:%s", __VA_ARGS__, buf);}} while(0)
#define vGDebug(param, ...) do { if (vDebugFlag & DEBUG_DEBUG) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vDebug(param ", gtid:%s", __VA_ARGS__, buf);}} while(0)
// clang-format on // clang-format on
// vnodeCfg.c // vnodeCfg.c
......
...@@ -183,6 +183,8 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co ...@@ -183,6 +183,8 @@ typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } Co
#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead)); #define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead));
#define transIsReq(type) (type & 1U) #define transIsReq(type) (type & 1U)
#define transLabel(trans) ((STrans*)trans)->label
// int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); // int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
// void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); // void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
//// int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); //// int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
......
...@@ -32,8 +32,12 @@ extern "C" { ...@@ -32,8 +32,12 @@ extern "C" {
#define tTrace(...) do {if (rpcDebugFlag & DEBUG_TRACE){ taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); }} while(0) #define tTrace(...) do {if (rpcDebugFlag & DEBUG_TRACE){ taosPrintLog("RPC ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); }} while(0)
#define tDump(x, y) do {if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); } } while(0) #define tDump(x, y) do {if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); } } while(0)
//#define tTR(param, ...) do { char buf[40] = {0};TRACE_TO_STR(trace, buf);tTrace("TRID: %s "param, buf, __VA_ARGS__);} while(0) #define tGTrace(param, ...) do { if (rpcDebugFlag & DEBUG_TRACE){char buf[40] = {0}; TRACE_TO_STR(trace, buf); tTrace(param ", gtid:%s", __VA_ARGS__, buf);}} while(0)
#define tGTrace(param, ...) do { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tTrace(param ", GTID: %s", __VA_ARGS__, buf);} while(0) #define tGFatal(param, ...) do {if (rpcDebugFlag & DEBUG_FATAL){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); tFatal(param ", gtid:%s", __VA_ARGS__, buf); }} while (0)
#define tGError(param, ...) do { if (rpcDebugFlag & DEBUG_ERROR){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); tError(param ", gtid:%s", __VA_ARGS__, buf);} } while(0)
#define tGWarn(param, ...) do { if (rpcDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tWarn(param ", gtid:%s", __VA_ARGS__, buf); }} while(0)
#define tGInfo(param, ...) do { if (rpcDebugFlag & DEBUG_INFO) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tInfo(param ", gtid:%s", __VA_ARGS__, buf); }} while(0)
#define tGDebug(param,...) do {if (rpcDebugFlag & DEBUG_DEBUG){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); tDebug(param ", gtid:%s", __VA_ARGS__, buf); }} while(0)
// clang-format on // clang-format on
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -477,9 +477,10 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { ...@@ -477,9 +477,10 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
return NULL; return NULL;
} }
queue* h = QUEUE_HEAD(&plist->conn); queue* h = QUEUE_HEAD(&plist->conn);
QUEUE_REMOVE(h); // //QUEUE_REMOVE(h);
SCliConn* conn = QUEUE_DATA(h, SCliConn, conn); SCliConn* conn = QUEUE_DATA(h, SCliConn, conn);
conn->status = ConnNormal; // conn->status = ConnNormal;
QUEUE_REMOVE(&conn->conn);
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->conn);
return conn; return conn;
} }
...@@ -487,7 +488,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { ...@@ -487,7 +488,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
SCliThrdObj* thrd = conn->hostThrd; SCliThrdObj* thrd = conn->hostThrd;
CONN_HANDLE_THREAD_QUIT(thrd); CONN_HANDLE_THREAD_QUIT(thrd);
STrans* pTransInst = ((SCliThrdObj*)conn->hostThrd)->pTransInst; STrans* pTransInst = thrd->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
transQueueClear(&conn->cliMsgs); transQueueClear(&conn->cliMsgs);
transCtxCleanup(&conn->ctx); transCtxCleanup(&conn->ctx);
...@@ -500,6 +501,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { ...@@ -500,6 +501,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
// list already create before // list already create before
assert(plist != NULL); assert(plist != NULL);
QUEUE_INIT(&conn->conn);
QUEUE_PUSH(&plist->conn, &conn->conn); QUEUE_PUSH(&plist->conn, &conn->conn);
assert(!QUEUE_IS_EMPTY(&plist->conn)); assert(!QUEUE_IS_EMPTY(&plist->conn));
} }
...@@ -563,6 +565,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { ...@@ -563,6 +565,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->conn); QUEUE_REMOVE(&conn->conn);
QUEUE_INIT(&conn->conn);
if (clear) { if (clear) {
uv_close((uv_handle_t*)conn->stream, cliDestroy); uv_close((uv_handle_t*)conn->stream, cliDestroy);
} }
...@@ -778,11 +781,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -778,11 +781,11 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) {
int ret = transSetConnOption((uv_tcp_t*)conn->stream); int ret = transSetConnOption((uv_tcp_t*)conn->stream);
if (ret) { if (ret) {
tError("%s conn %p failed to set conn option, errmsg %s", pTransInst->label, conn, uv_err_name(ret)); tError("%s conn %p failed to set conn option, errmsg %s", transLabel(pTransInst), conn, uv_err_name(ret));
} }
int fd = taosCreateSocketWithTimeOutOpt(TRANS_CONN_TIMEOUT); int fd = taosCreateSocketWithTimeOutOpt(TRANS_CONN_TIMEOUT);
if (fd == -1) { if (fd == -1) {
tTrace("%s conn %p failed to create socket", pTransInst->label, conn); tTrace("%s conn %p failed to create socket", transLabel(pTransInst), conn);
cliHandleExcept(conn); cliHandleExcept(conn);
return; return;
} }
...@@ -1110,7 +1113,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra ...@@ -1110,7 +1113,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx]; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx];
STraceId* trace = &pReq->info.traceId; STraceId* trace = &pReq->info.traceId;
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", pTransInst->label, thrd->pid, tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), thrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0); ASSERT(transSendAsync(thrd->asyncPool, &(cliMsg->q)) == 0);
} }
...@@ -1143,7 +1146,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM ...@@ -1143,7 +1146,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx]; SCliThrdObj* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[idx];
STraceId* trace = &pReq->info.traceId; STraceId* trace = &pReq->info.traceId;
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", pTransInst->label, thrd->pid, tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), thrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
transSendAsync(thrd->asyncPool, &(cliMsg->q)); transSendAsync(thrd->asyncPool, &(cliMsg->q));
......
...@@ -284,12 +284,12 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -284,12 +284,12 @@ static void uvHandleReq(SSvrConn* pConn) {
if (pConn->status == ConnNormal && pHead->noResp == 0) { if (pConn->status == ConnNormal && pHead->noResp == 0) {
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
tGTrace("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", transLabel(pConn), pConn,
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr),
ntohs(pConn->localAddr.sin_port), transMsg.contLen);
} else {
tGTrace("conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d", pConn,
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen);
} else {
tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d", transLabel(pConn),
pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp, taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp,
transMsg.code); transMsg.code);
// no ref here // no ref here
...@@ -304,7 +304,8 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -304,7 +304,8 @@ static void uvHandleReq(SSvrConn* pConn) {
transMsg.info.refId = pConn->refId; transMsg.info.refId = pConn->refId;
transMsg.info.traceId = pHead->traceId; transMsg.info.traceId = pHead->traceId;
tGTrace("handle %p conn: %p translated to app, refId: %" PRIu64 "", transMsg.info.handle, pConn, pConn->refId); tGTrace("%s handle %p conn: %p translated to app, refId: %" PRIu64 "", transLabel(pConn), transMsg.info.handle, pConn,
pConn->refId);
assert(transMsg.info.handle != NULL); assert(transMsg.info.handle != NULL);
if (pHead->noResp == 1) { if (pHead->noResp == 1) {
...@@ -330,12 +331,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -330,12 +331,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
tTrace("conn %p total read: %d, current read: %d", conn, pBuf->len, (int)nread); tTrace("%s conn %p total read: %d, current read: %d", transLabel(conn->pTransInst), conn, pBuf->len, (int)nread);
if (transReadComplete(pBuf)) { if (transReadComplete(pBuf)) {
tTrace("conn %p alread read complete packet", conn); tTrace("%s conn %p alread read complete packet", transLabel(conn->pTransInst), conn);
uvHandleReq(conn); uvHandleReq(conn);
} else { } else {
tTrace("conn %p read partial packet, continue to read", conn); tTrace("%s conn %p read partial packet, continue to read", transLabel(conn->pTransInst), conn);
} }
return; return;
} }
...@@ -343,12 +344,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { ...@@ -343,12 +344,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
return; return;
} }
tError("conn %p read error: %s", conn, uv_err_name(nread)); tError("%s conn %p read error: %s", transLabel(conn->pTransInst), conn, uv_err_name(nread));
if (nread < 0) { if (nread < 0) {
conn->broken = true; conn->broken = true;
if (conn->status == ConnAcquire) { if (conn->status == ConnAcquire) {
if (conn->regArg.init) { if (conn->regArg.init) {
tTrace("conn %p broken, notify server app", conn); tTrace("%s conn %p broken, notify server app", transLabel(conn->pTransInst), conn);
STrans* pTransInst = conn->pTransInst; STrans* pTransInst = conn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL);
memset(&conn->regArg, 0, sizeof(conn->regArg)); memset(&conn->regArg, 0, sizeof(conn->regArg));
...@@ -457,9 +458,9 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { ...@@ -457,9 +458,9 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
int32_t len = transMsgLenFromCont(pMsg->contLen); int32_t len = transMsgLenFromCont(pMsg->contLen);
STraceId* trace = &pMsg->info.traceId; STraceId* trace = &pMsg->info.traceId;
tGTrace("conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", pConn, TMSG_INFO(pHead->msgType), tGTrace("%s conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", transLabel(pConn->pTransInst), pConn,
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
ntohs(pConn->localAddr.sin_port), len); taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), len);
pHead->msgLen = htonl(len); pHead->msgLen = htonl(len);
wb->base = msg; wb->base = msg;
...@@ -737,7 +738,7 @@ static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName) { ...@@ -737,7 +738,7 @@ static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName) {
// conn set // conn set
QUEUE_INIT(&pThrd->conn); QUEUE_INIT(&pThrd->conn);
pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, uvWorkerAsyncCb); pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 1, pThrd, uvWorkerAsyncCb);
uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
// uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); // uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
return true; return true;
...@@ -801,7 +802,7 @@ static SSvrConn* createConn(void* hThrd) { ...@@ -801,7 +802,7 @@ static SSvrConn* createConn(void* hThrd) {
pConn->refId = exh->refId; pConn->refId = exh->refId;
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
tTrace("handle %p, conn %p created, refId: %" PRId64 "", exh, pConn, pConn->refId); tTrace("%s handle %p, conn %p created, refId: %" PRId64 "", transLabel(pThrd->pTransInst), exh, pConn, pConn->refId);
return pConn; return pConn;
} }
...@@ -848,7 +849,7 @@ static void uvDestroyConn(uv_handle_t* handle) { ...@@ -848,7 +849,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
transReleaseExHandle(refMgt, conn->refId); transReleaseExHandle(refMgt, conn->refId);
transRemoveExHandle(refMgt, conn->refId); transRemoveExHandle(refMgt, conn->refId);
tDebug("conn %p destroy", conn); tDebug("%s conn %p destroy", transLabel(thrd->pTransInst), conn);
// uv_timer_stop(&conn->pTimer); // uv_timer_stop(&conn->pTimer);
transQueueDestroy(&conn->srvMsgs); transQueueDestroy(&conn->srvMsgs);
...@@ -977,18 +978,18 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd) { ...@@ -977,18 +978,18 @@ void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd) {
uvStartSendRespInternal(msg); uvStartSendRespInternal(msg);
return; return;
} else if (conn->status == ConnRelease || conn->status == ConnNormal) { } else if (conn->status == ConnRelease || conn->status == ConnNormal) {
tDebug("conn %p already released, ignore release-msg", conn); tDebug("%s conn %p already released, ignore release-msg", transLabel(thrd->pTransInst), conn);
} }
destroySmsg(msg); destroySmsg(msg);
} }
void uvHandleResp(SSvrMsg* msg, SWorkThrdObj* thrd) { void uvHandleResp(SSvrMsg* msg, SWorkThrdObj* thrd) {
// send msg to client // send msg to client
tDebug("conn %p start to send resp (2/2)", msg->pConn); tDebug("%s conn %p start to send resp (2/2)", transLabel(thrd->pTransInst), msg->pConn);
uvStartSendResp(msg); uvStartSendResp(msg);
} }
void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd) { void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd) {
SSvrConn* conn = msg->pConn; SSvrConn* conn = msg->pConn;
tDebug("conn %p register brokenlink callback", conn); tDebug("%s conn %p register brokenlink callback", transLabel(thrd->pTransInst), conn);
if (conn->status == ConnAcquire) { if (conn->status == ConnAcquire) {
if (!transQueuePush(&conn->srvMsgs, msg)) { if (!transQueuePush(&conn->srvMsgs, msg)) {
return; return;
...@@ -1094,7 +1095,7 @@ void transReleaseSrvHandle(void* handle) { ...@@ -1094,7 +1095,7 @@ void transReleaseSrvHandle(void* handle) {
m->msg = tmsg; m->msg = tmsg;
m->type = Release; m->type = Release;
tTrace("conn %p start to release", exh->handle); tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
transSendAsync(pThrd->asyncPool, &m->q); transSendAsync(pThrd->asyncPool, &m->q);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refMgt, refId);
return; return;
...@@ -1152,7 +1153,7 @@ void transRegisterMsg(const STransMsg* msg) { ...@@ -1152,7 +1153,7 @@ void transRegisterMsg(const STransMsg* msg) {
m->msg = tmsg; m->msg = tmsg;
m->type = Register; m->type = Register;
tTrace("conn %p start to register brokenlink callback", exh->handle); tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle);
transSendAsync(pThrd->asyncPool, &m->q); transSendAsync(pThrd->asyncPool, &m->q);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refMgt, refId);
return; return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册