From e5d767cf81de3f9f7d8a0d3669687fbf11ed5a25 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 10 Feb 2022 16:29:44 +0800 Subject: [PATCH] add trace log --- source/libs/transport/src/transCli.c | 40 +++++++++++++++----------- source/libs/transport/src/transSrv.c | 42 ++++++++++++++-------------- 2 files changed, 44 insertions(+), 38 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e87a7a2ed4..decd0af484 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -32,6 +32,8 @@ typedef struct SCliConn { uint64_t expireTime; int8_t notifyCount; // timers already notify to client int32_t ref; + + struct sockaddr_in addr; } SCliConn; typedef struct SCliMsg { @@ -124,7 +126,8 @@ static void clientHandleResp(SCliConn* conn) { rpcMsg.msgType = pHead->msgType; rpcMsg.ahandle = pCtx->ahandle; - tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), pMsg->ctx->ip, pMsg->ctx->port); + tDebug("client conn %p %s received from %s:%d", conn, TMSG_INFO(pHead->msgType), inet_ntoa(conn->addr.sin_addr), + ntohs(conn->addr.sin_port)); if (pCtx->pSem == NULL) { tTrace("client conn(sync) %p handle resp", conn); (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL); @@ -156,7 +159,7 @@ static void clientHandleExcept(SCliConn* pConn) { clientConnDestroy(pConn, true); return; } - tDebug("client conn %p start to destroy", pConn); + tTrace("client conn %p start to destroy", pConn); SCliMsg* pMsg = pConn->data; destroyUserdata(&pMsg->msg); @@ -186,7 +189,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; SRpcInfo* pRpc = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; - tDebug("client conn timeout, try to remove expire conn from conn pool"); + tTrace("client conn timeout, try to remove expire conn from conn pool"); SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); while (p != NULL) { @@ -255,7 +258,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { tstrncpy(key, ip, strlen(ip)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); - tDebug("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); + tTrace("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; @@ -296,10 +299,10 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf pBuf->len += nread; if (clientReadComplete(pBuf)) { uv_read_stop((uv_stream_t*)conn->stream); - tDebug("client conn %p read complete", conn); + tTrace("client conn %p read complete", conn); clientHandleResp(conn); } else { - tDebug("client conn %p read partial packet, continue to read", conn); + tTrace("client conn %p read partial packet, continue to read", conn); } return; } @@ -322,9 +325,8 @@ static void clientConnDestroy(SCliConn* conn, bool clear) { // conn->ref--; if (conn->ref == 0) { - tDebug("client conn %p remove from conn pool", conn); + tTrace("client conn %p remove from conn pool", conn); QUEUE_REMOVE(&conn->conn); - tDebug("client conn %p remove from conn pool successfully", conn); if (clear) { uv_close((uv_handle_t*)conn->stream, clientDestroy); } @@ -336,7 +338,7 @@ static void clientDestroy(uv_handle_t* handle) { free(conn->stream); free(conn->writeReq); - tDebug("client conn %p destroy successfully", conn); + tTrace("client conn %p destroy successfully", conn); free(conn); // clientConnDestroy(conn, false); @@ -345,7 +347,7 @@ static void clientDestroy(uv_handle_t* handle) { static void clientWriteCb(uv_write_t* req, int status) { SCliConn* pConn = req->data; if (status == 0) { - tDebug("client conn %p data already was written out", pConn); + tTrace("client conn %p data already was written out", pConn); SCliMsg* pMsg = pConn->data; if (pMsg == NULL) { // handle @@ -372,7 +374,8 @@ static void clientWrite(SCliConn* pConn) { pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("conn %p %s is send to %s:%d", pConn, TMSG_INFO(pHead->msgType), pCliMsg->ctx->ip, pCliMsg->ctx->port); + tDebug("client conn %p %s is send to %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), + ntohs(pConn->addr.sin_port)); uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); } static void clientConnCb(uv_connect_t* req, int status) { @@ -384,14 +387,17 @@ static void clientConnCb(uv_connect_t* req, int status) { clientHandleExcept(pConn); return; } - tDebug("client conn %p create", pConn); + int addrlen = sizeof(pConn->addr); + uv_tcp_getpeername((uv_tcp_t*)pConn->stream, (struct sockaddr*)&pConn->addr, &addrlen); + + tTrace("client conn %p create", pConn); assert(pConn->stream == req->handle); clientWrite(pConn); } static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { - tDebug("thread %p start to quit", pThrd); + tDebug("client work thread %p start to quit", pThrd); destroyCmsg(pMsg); // transDestroyAsyncPool(pThr) uv_close((uv_handle_t*)pThrd->cliAsync, NULL); uv_timer_stop(pThrd->timer); @@ -402,14 +408,14 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; - tDebug("client msg tran time cost: %" PRIu64 "", el); + tTrace("client msg tran time cost: %" PRIu64 "", el); et = taosGetTimestampUs(); STransConnCtx* pCtx = pMsg->ctx; SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); if (conn != NULL) { // impl later - tDebug("client get conn %p from pool", conn); + tTrace("client get conn %p from pool", conn); conn->data = pMsg; conn->writeReq->data = conn; transDestroyBuffer(&conn->readBuf); @@ -470,7 +476,7 @@ static void clientAsyncCb(uv_async_t* handle) { count++; } if (count >= 2) { - tDebug("already process batch size: %d", count); + tTrace("client process batch size: %d", count); } } @@ -532,7 +538,7 @@ static SCliThrdObj* createThrdObj() { uv_timer_init(pThrd->loop, pThrd->timer); pThrd->timer->data = pThrd; - pThrd->pool = creatConnPool(1); + pThrd->pool = creatConnPool(4); pThrd->quit = false; return pThrd; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 0389e3584e..b8bbea92ce 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -124,7 +124,7 @@ void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b transAllocBuffer(pBuf, buf); } -// check data read from socket completely or not +// check data read from socket complete or not // static bool readComplete(SConnBuffer* data) { // TODO(yihao): handle pipeline later @@ -258,7 +258,7 @@ static void uvHandleReq(SSrvConn* pConn) { transClearBuffer(&pConn->readBuf); pConn->ref++; - tDebug("%p %s received from %s:%d", pConn, TMSG_INFO(rpcMsg.msgType), inet_ntoa(pConn->addr.sin_addr), + tDebug("server conn %p %s received from %s:%d", pConn, TMSG_INFO(rpcMsg.msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port)); (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); @@ -272,12 +272,12 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { pBuf->len += nread; - tTrace("conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread); + tTrace("server conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread); if (readComplete(pBuf)) { - tTrace("conn %p alread read complete packet", conn); + tTrace("server conn %p alread read complete packet", conn); uvHandleReq(conn); } else { - tTrace("conn %p read partial packet, continue to read", conn); + tTrace("server %p read partial packet, continue to read", conn); } return; } @@ -288,7 +288,7 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { if (conn->ref > 1) { conn->ref++; // ref > 1 signed that write is in progress } - tDebug("conn %p read error: %s", conn, uv_err_name(nread)); + tError("server conn %p read error: %s", conn, uv_err_name(nread)); destroyConn(conn, true); } } @@ -300,7 +300,7 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b void uvOnTimeoutCb(uv_timer_t* handle) { // opt SSrvConn* pConn = handle->data; - tDebug("conn %p time out", pConn); + tError("server conn %p time out", pConn); } void uvOnWriteCb(uv_write_t* req, int status) { @@ -312,9 +312,9 @@ void uvOnWriteCb(uv_write_t* req, int status) { transClearBuffer(&conn->readBuf); if (status == 0) { - tDebug("conn %p data already was written on stream", conn); + tTrace("server conn %p data already was written on stream", conn); } else { - tDebug("conn %p failed to write data, %s", conn, uv_err_name(status)); + tError("server conn %p failed to write data, %s", conn, uv_err_name(status)); // destroyConn(conn, true); } @@ -322,7 +322,7 @@ void uvOnWriteCb(uv_write_t* req, int status) { } static void uvOnPipeWriteCb(uv_write_t* req, int status) { if (status == 0) { - tDebug("success to dispatch conn to work thread"); + tTrace("success to dispatch conn to work thread"); } else { tError("fail to dispatch conn to work thread"); } @@ -330,7 +330,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { // impl later; - tDebug("conn %p prepare to send resp", smsg->pConn); + tTrace("server conn %p prepare to send resp", smsg->pConn); SRpcMsg* pMsg = &smsg->msg; SSrvConn* pConn = smsg->pConn; if (pMsg->pCont == 0) { @@ -345,7 +345,7 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { if (transCompressMsg(msg, len, NULL)) { // impl later } - tDebug("%p start to send %s to %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), + tDebug("server conn %p %s is sent to %s:%d", pConn, TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port)); pHead->msgLen = htonl(len); @@ -392,7 +392,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q); if (msg == NULL) { - tError("except occurred, continue"); + tError("unexcept occurred, continue"); continue; } if (msg->pConn == NULL) { @@ -430,7 +430,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads; - tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); + tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb); } else { uv_close((uv_handle_t*)cli, NULL); @@ -438,7 +438,7 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { } } void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { - tDebug("connection coming"); + tTrace("server connection coming"); if (nread < 0) { if (nread != UV_EOF) { tError("read error %s", uv_err_name(nread)); @@ -486,10 +486,10 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { uv_os_fd_t fd; uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); - tDebug("conn %p created, fd: %d", pConn, fd); + tTrace("server conn %p created, fd: %d", pConn, fd); int addrlen = sizeof(pConn->addr); if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) { - tError("failed to get peer name"); + tError("server conn %p failed to get peer info", pConn); destroyConn(pConn, true); } else { uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); @@ -562,7 +562,7 @@ void* workerThread(void* arg) { static SSrvConn* createConn() { SSrvConn* pConn = (SSrvConn*)calloc(1, sizeof(SSrvConn)); - tDebug("conn %p created", pConn); + tTrace("conn %p created", pConn); ++pConn->ref; return pConn; } @@ -571,7 +571,7 @@ static void destroyConn(SSrvConn* conn, bool clear) { if (conn == NULL) { return; } - tDebug("conn %p try to destroy", conn); + tTrace("server conn %p try to destroy", conn); if (--conn->ref > 0) { return; } @@ -585,7 +585,7 @@ static void destroyConn(SSrvConn* conn, bool clear) { } static void uvDestroyConn(uv_handle_t* handle) { SSrvConn* conn = handle->data; - tDebug("conn %p destroy", conn); + tDebug("server conn %p destroy", conn); uv_timer_stop(conn->pTimer); free(conn->pTimer); // free(conn->pTcp); @@ -729,7 +729,7 @@ void rpcSendResponse(const SRpcMsg* pMsg) { // QUEUE_PUSH(&pThrd->msg, &srvMsg->q); // pthread_mutex_unlock(&pThrd->msgMtx); - tTrace("conn %p start to send resp", pConn); + tTrace("server conn %p start to send resp", pConn); transSendAsync(pThrd->asyncPool, &srvMsg->q); // uv_async_send(pThrd->workerAsync); } -- GitLab