diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 4ffaad3f84a5fab444aa0d72fe4c160019b68375..d64df9b0f3aea486560479f55a02274d3ff47162 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -95,8 +95,10 @@ static void clientHandleExcept(SCliConn* conn); // handle req from app static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); -static void clientMsgDestroy(SCliMsg* pMsg); -static void destroyTransConnCtx(STransConnCtx* ctx); +static void destroyUserdata(SRpcMsg* userdata); + +static void destroyCmsg(SCliMsg* cmsg); +static void transDestroyConnCtx(STransConnCtx* ctx); // thread obj static SCliThrdObj* createThrdObj(); static void destroyThrdObj(SCliThrdObj* pThrd); @@ -104,7 +106,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd); static void* clientThread(void* arg); static void clientHandleResp(SCliConn* conn) { - STransConnCtx* pCtx = ((SCliMsg*)conn->data)->ctx; + SCliMsg* pMsg = conn->data; + STransConnCtx* pCtx = pMsg->ctx; SRpcInfo* pRpc = pCtx->pTransInst; STransMsgHead* pHead = (STransMsgHead*)(conn->readBuf.buf); @@ -122,41 +125,44 @@ static void clientHandleResp(SCliConn* conn) { (pRpc->cfp)(NULL, &rpcMsg, NULL); conn->notifyCount += 1; - SCliThrdObj* pThrd = conn->hostThrd; - tfree(conn->data); - // buf alread translated to rpcMsg.pCont + // buf's mem alread translated to rpcMsg.pCont transClearBuffer(&conn->readBuf); uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb); + + SCliThrdObj* pThrd = conn->hostThrd; addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); + destroyCmsg(pMsg); + conn->data = NULL; // start thread's timer of conn pool if not active if (!uv_is_active((uv_handle_t*)pThrd->pTimer) && pRpc->idleTime > 0) { uv_timer_start((uv_timer_t*)pThrd->pTimer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } - destroyTransConnCtx(pCtx); } static void clientHandleExcept(SCliConn* pConn) { if (pConn->data == NULL) { + // handle conn except in conn pool clientConnDestroy(pConn, true); return; } - tDebug("conn %p destroy", pConn); + tDebug("conn %p start to destroy", pConn); SCliMsg* pMsg = pConn->data; - transFreeMsg((pMsg->msg.pCont)); - pMsg->msg.pCont = NULL; + + destroyUserdata(&pMsg->msg); STransConnCtx* pCtx = pMsg->ctx; - SRpcInfo* pRpc = pCtx->pTransInst; SRpcMsg rpcMsg = {0}; rpcMsg.ahandle = pCtx->ahandle; rpcMsg.code = -1; // SRpcInfo* pRpc = pMsg->ctx->pRpc; - (pRpc->cfp)(NULL, &rpcMsg, NULL); - tfree(pConn->data); + (pCtx->pTransInst->cfp)(NULL, &rpcMsg, NULL); pConn->notifyCount += 1; - destroyTransConnCtx(pCtx); + + destroyCmsg(pMsg); + pConn->data = NULL; + // transDestroyConnCtx(pCtx); clientConnDestroy(pConn, true); } @@ -236,6 +242,7 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { tDebug("conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); SRpcInfo* pRpc = ((SCliThrdObj*)conn->hostThrd)->pTransInst; + conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); conn->notifyCount = 0; @@ -282,6 +289,9 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf } assert(nread <= 0); if (nread == 0) { + // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb + // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under + // read(2). return; } if (nread < 0 || nread == UV_EOF) { @@ -321,11 +331,12 @@ static void clientWriteCb(uv_write_t* req, int status) { if (status == 0) { tDebug("conn %p data already was written out", pConn); SCliMsg* pMsg = pConn->data; - if (pMsg != NULL) { - transFreeMsg((pMsg->msg.pCont)); - pMsg->msg.pCont = NULL; + if (pMsg == NULL) { + destroy + // handle + return; } - + destroyUserdata(&pMsg->msg); } else { tError("conn %p failed to write: %s", pConn, uv_err_name(status)); clientHandleExcept(pConn); @@ -453,8 +464,20 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, } return cli; } -static void clientMsgDestroy(SCliMsg* pMsg) { - // impl later + +static void destroyUserdata(SRpcMsg* userdata) { + if (userdata->pCont == NULL) { + return; + } + transFreeMsg(userdata->pCont); + userdata->pCont = NULL; +} +static void destroyCmsg(SCliMsg* pMsg) { + if (pMsg == NULL) { + return; + } + transDestroyConnCtx(pMsg->ctx); + destroyUserdata(&pMsg->msg); free(pMsg); } static SCliThrdObj* createThrdObj() { @@ -487,7 +510,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { free(pThrd); } -static void destroyTransConnCtx(STransConnCtx* ctx) { +static void transDestroyConnCtx(STransConnCtx* ctx) { if (ctx != NULL) { free(ctx->ip); } diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 485ed904e83f2000a9b9a0d552a0022e3babb004..f36d9bd4937221f2d84e97103f3a8f42eef90b52 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -91,7 +91,7 @@ static void uvWorkerAsyncCb(uv_async_t* handle); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); static void uvStartSendResp(SSrvMsg* msg); -static void destroySrvMsg(SSrvConn* conn); +static void destroySmsg(SSrvMsg* smsg); // check whether already read complete packet static bool readComplete(SConnBuffer* buf); static SSrvConn* createConn(); @@ -305,8 +305,10 @@ void uvOnTimeoutCb(uv_timer_t* handle) { void uvOnWriteCb(uv_write_t* req, int status) { SSrvConn* conn = req->data; - SSrvMsg* smsg = conn->pSrvMsg; - destroySrvMsg(conn); + + SSrvMsg* smsg = conn->pSrvMsg; + destroySmsg(smsg); + conn->pSrvMsg = NULL; transClearBuffer(&conn->readBuf); if (status == 0) { @@ -362,14 +364,12 @@ static void uvStartSendResp(SSrvMsg* smsg) { return; } -static void destroySrvMsg(SSrvConn* conn) { - SSrvMsg* smsg = conn->pSrvMsg; +static void destroySmsg(SSrvMsg* smsg) { if (smsg == NULL) { return; } transFreeMsg(smsg->msg.pCont); - free(conn->pSrvMsg); - conn->pSrvMsg = NULL; + free(smsg); } void uvWorkerAsyncCb(uv_async_t* handle) { SWorkThrdObj* pThrd = handle->data; @@ -555,7 +555,8 @@ static void destroyConn(SSrvConn* conn, bool clear) { return; } transDestroyBuffer(&conn->readBuf); - destroySrvMsg(conn); + destroySmsg(conn->pSrvMsg); + conn->pSrvMsg = NULL; if (clear) { uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); diff --git a/source/libs/transport/test/rserver.c b/source/libs/transport/test/rserver.c index 12d8a01819658174e676399e26331d053b79440c..d1a587f4e5539a45001cafa28e58f134f9bcdad3 100644 --- a/source/libs/transport/test/rserver.c +++ b/source/libs/transport/test/rserver.c @@ -165,6 +165,7 @@ int main(int argc, char *argv[]) { tError("failed to start RPC server"); return -1; } + // sleep(5); tInfo("RPC server is running, ctrl-c to exit"); @@ -172,7 +173,6 @@ int main(int argc, char *argv[]) { dataFd = open(dataName, O_APPEND | O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO); if (dataFd < 0) tInfo("failed to open data file, reason:%s", strerror(errno)); } - qhandle = taosOpenQueue(); qset = taosOpenQset(); taosAddIntoQset(qset, qhandle, NULL);