From a0804f191af8f2f963aade10c0ff0f9454a635ea Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 9 Feb 2022 21:44:42 +0800 Subject: [PATCH] fix crash --- source/libs/transport/src/trans.c | 1 + source/libs/transport/src/transCli.c | 40 ++++++++++++++-------------- source/libs/transport/src/transSrv.c | 6 ++++- 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index a6040a3873..5fc937bccd 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -35,6 +35,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); + pRpc->parent = pInit->parent; return pRpc; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 3d93049c6a..f5eeae26e6 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -124,10 +124,10 @@ static void clientHandleResp(SCliConn* conn) { rpcMsg.msgType = pHead->msgType; rpcMsg.ahandle = pCtx->ahandle; if (pCtx->pSem == NULL) { - tDebug("conn %p handle resp", conn); - (pRpc->cfp)(NULL, &rpcMsg, NULL); + tDebug("client conn %p handle resp, ", conn); + (pRpc->cfp)(pRpc->parent, &rpcMsg, NULL); } else { - tDebug("conn %p handle resp", conn); + tDebug("client conn(sync) %p handle resp", conn); memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); tsem_post(pCtx->pSem); } @@ -154,7 +154,7 @@ static void clientHandleExcept(SCliConn* pConn) { clientConnDestroy(pConn, true); return; } - tDebug("conn %p start to destroy", pConn); + tDebug("client conn %p start to destroy", pConn); SCliMsg* pMsg = pConn->data; destroyUserdata(&pMsg->msg); @@ -166,7 +166,7 @@ static void clientHandleExcept(SCliConn* pConn) { rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; if (pCtx->pSem == NULL) { // SRpcInfo* pRpc = pMsg->ctx->pRpc; - (pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL); + (pCtx->pTransInst->cfp)(pCtx->pTransInst->parent, &rpcMsg, NULL); } else { memcpy((char*)(pCtx->pRsp), (char*)(&rpcMsg), sizeof(rpcMsg)); // SRpcMsg rpcMsg @@ -184,7 +184,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { SCliThrdObj* pThrd = handle->data; SRpcInfo* pRpc = pThrd->pTransInst; int64_t currentTime = pThrd->nextTimeout; - tDebug("timeout, try to remove expire conn from conn pool"); + tDebug("client conn timeout, try to remove expire conn from conn pool"); SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); while (p != NULL) { @@ -253,7 +253,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { tstrncpy(key, ip, strlen(ip)); tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); - tDebug("conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); + tDebug("client conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; @@ -294,10 +294,10 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf pBuf->len += nread; if (clientReadComplete(pBuf)) { uv_read_stop((uv_stream_t*)conn->stream); - tDebug("conn %p read complete", conn); + tDebug("client conn %p read complete", conn); clientHandleResp(conn); } else { - tDebug("conn %p read partial packet, continue to read", conn); + tDebug("client conn %p read partial packet, continue to read", conn); } return; } @@ -309,7 +309,7 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf return; } if (nread < 0 || nread == UV_EOF) { - tError("conn %p read error: %s", conn, uv_err_name(nread)); + tError("client conn %p read error: %s", conn, uv_err_name(nread)); clientHandleExcept(conn); } // tDebug("Read error %s\n", uv_err_name(nread)); @@ -320,9 +320,9 @@ static void clientConnDestroy(SCliConn* conn, bool clear) { // conn->ref--; if (conn->ref == 0) { - tDebug("conn %p remove from conn pool", conn); + tDebug("client conn %p remove from conn pool", conn); QUEUE_REMOVE(&conn->conn); - tDebug("conn %p remove from conn pool successfully", conn); + tDebug("client conn %p remove from conn pool successfully", conn); if (clear) { uv_close((uv_handle_t*)conn->stream, clientDestroy); } @@ -334,7 +334,7 @@ static void clientDestroy(uv_handle_t* handle) { free(conn->stream); free(conn->writeReq); - tDebug("conn %p destroy successfully", conn); + tDebug("client conn %p destroy successfully", conn); free(conn); // clientConnDestroy(conn, false); @@ -343,7 +343,7 @@ static void clientDestroy(uv_handle_t* handle) { static void clientWriteCb(uv_write_t* req, int status) { SCliConn* pConn = req->data; if (status == 0) { - tDebug("conn %p data already was written out", pConn); + tDebug("client conn %p data already was written out", pConn); SCliMsg* pMsg = pConn->data; if (pMsg == NULL) { // handle @@ -351,7 +351,7 @@ static void clientWriteCb(uv_write_t* req, int status) { } destroyUserdata(&pMsg->msg); } else { - tError("conn %p failed to write: %s", pConn, uv_err_name(status)); + tError("client conn %p failed to write: %s", pConn, uv_err_name(status)); clientHandleExcept(pConn); return; } @@ -370,7 +370,7 @@ static void clientWrite(SCliConn* pConn) { pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("conn %p data write out, msgType : %d, len: %d", pConn, pHead->msgType, msgLen); + tDebug("client conn %p data write out, msgType : %s, len: %d", pConn, TMSG_INFO(pHead->msgType), msgLen); uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); } static void clientConnCb(uv_connect_t* req, int status) { @@ -378,11 +378,11 @@ static void clientConnCb(uv_connect_t* req, int status) { SCliConn* pConn = req->data; if (status != 0) { // tError("failed to connect server(%s, %d), errmsg: %s", pCtx->ip, pCtx->port, uv_strerror(status)); - tError("conn %p failed to connect server: %s", pConn, uv_strerror(status)); + tError("client conn %p failed to connect server: %s", pConn, uv_strerror(status)); clientHandleExcept(pConn); return; } - tDebug("conn %p create", pConn); + tDebug("client conn %p create", pConn); assert(pConn->stream == req->handle); clientWrite(pConn); @@ -400,14 +400,14 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { uint64_t et = taosGetTimestampUs(); uint64_t el = et - pMsg->st; - tDebug("msg tran time cost: %" PRIu64 "", el); + tDebug("client msg tran time cost: %" PRIu64 "", el); et = taosGetTimestampUs(); STransConnCtx* pCtx = pMsg->ctx; SCliConn* conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); if (conn != NULL) { // impl later - tDebug("conn %p get from conn pool", conn); + tDebug("client get conn %p from pool", conn); conn->data = pMsg; conn->writeReq->data = conn; transDestroyBuffer(&conn->readBuf); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 4d2ac434dd..f3cfd25408 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -266,6 +266,7 @@ static void uvHandleReq(SSrvConn* pConn) { transClearBuffer(&pConn->readBuf); pConn->ref++; + tDebug("%s received on %p", TMSG_INFO(rpcMsg.msgType), pConn); (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth @@ -278,7 +279,7 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { pBuf->len += nread; - tDebug("conn %p read summroy, total read: %d, current read: %d", conn, pBuf->len, (int)nread); + tDebug("conn %p read summary, total read: %d, current read: %d", conn, pBuf->len, (int)nread); if (readComplete(pBuf)) { tDebug("conn %p alread read complete packet", conn); uvHandleReq(conn); @@ -717,6 +718,9 @@ void taosCloseServer(void* arg) { } void rpcSendResponse(const SRpcMsg* pMsg) { + if (pMsg->handle == NULL) { + return; + } SSrvConn* pConn = pMsg->handle; SWorkThrdObj* pThrd = pConn->hostThrd; -- GitLab