From 62700c1f1307c94f0c0b7a0d34ee6d21620c2446 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 11 Mar 2022 21:39:16 +0800 Subject: [PATCH] handle persist conn --- include/libs/transport/trpc.h | 2 - source/libs/transport/src/transCli.c | 89 +++++++++++++++---------- source/libs/transport/src/transSrv.c | 14 ++-- source/libs/transport/test/pushClient.c | 2 +- 4 files changed, 61 insertions(+), 46 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 6ccb6c0dc4..feb493b50f 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -47,8 +47,6 @@ typedef struct SRpcMsg { void * ahandle; // app handle set by client int persist; // keep handle or not, default 0 - SRpcPush *push; - } SRpcMsg; typedef struct SRpcPush { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1323677071..c2d5feccaf 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -17,11 +17,6 @@ #include "transComm.h" -#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) -#define CONN_PERSIST_TIME(para) (para * 1000 * 10) - -#define CONN_GET_INST_LABEL(conn) (((SRpcInfo*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label) - typedef struct SCliConn { T_REF_DECLARE() uv_connect_t connReq; @@ -32,7 +27,6 @@ typedef struct SCliConn { void* data; queue conn; uint64_t expireTime; - int8_t ctnRdCnt; // continue read count int hThrdIdx; bool broken; // link broken or not @@ -92,9 +86,9 @@ static void clientTimeoutCb(uv_timer_t* handle); // alloc buf for read static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); // callback after read nbytes from socket -static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +static void clientRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); // callback after write data to socket -static void clientWriteCb(uv_write_t* req, int status); +static void clientSendDataCb(uv_write_t* req, int status); // callback after conn to server static void clientConnCb(uv_connect_t* req, int status); static void clientAsyncCb(uv_async_t* handle); @@ -120,7 +114,27 @@ static void transDestroyConnCtx(STransConnCtx* ctx); // thread obj static SCliThrdObj* createThrdObj(); static void destroyThrdObj(SCliThrdObj* pThrd); -// thread + +#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) +#define CONN_PERSIST_TIME(para) (para * 1000 * 10) + +#define CONN_GET_INST_LABEL(conn) (((SRpcInfo*)(((SCliThrdObj*)conn->hostThrd)->pTransInst))->label) +#define CONN_HANDLE_THREAD_QUIT(conn, thrd) \ + do { \ + if (thrd->quit) { \ + clientHandleExcept(conn); \ + } \ + goto _RETURE; \ + } while (0) + +#define CONN_HANDLE_BROKEN(conn) \ + do { \ + if (conn->broken) { \ + clientHandleExcept(conn); \ + } \ + goto _RETURE; \ + } while (0); + static void* clientThread(void* arg); static void* clientNotifyApp() {} @@ -147,6 +161,8 @@ static void clientHandleResp(SCliConn* conn) { if (pTransInst->pfp != NULL && (pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) { rpcMsg.handle = conn; + transRefCliHandle(conn); + conn->persist = 1; tDebug("client conn %p persist by app", conn); } @@ -165,9 +181,8 @@ static void clientHandleResp(SCliConn* conn) { memcpy((char*)pCtx->pRsp, (char*)&rpcMsg, sizeof(rpcMsg)); tsem_post(pCtx->pSem); } - conn->ctnRdCnt += 1; - uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientReadCb); + uv_read_start((uv_stream_t*)conn->stream, clientAllocBufferCb, clientRecvCb); // user owns conn->persist = 1 if (conn->persist == 0) { @@ -290,7 +305,6 @@ static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn) { conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); - conn->ctnRdCnt = 0; // list already create before assert(plist != NULL); QUEUE_PUSH(&plist->conn, &conn->conn); @@ -300,7 +314,7 @@ static void clientAllocBufferCb(uv_handle_t* handle, size_t suggested_size, uv_b SConnBuffer* pBuf = &conn->readBuf; transAllocBuffer(pBuf, buf); } -static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { +static void clientRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { // impl later if (handle->data == NULL) { return; @@ -363,7 +377,7 @@ static void clientDestroy(uv_handle_t* handle) { free(conn); } -static void clientWriteCb(uv_write_t* req, int status) { +static void clientSendDataCb(uv_write_t* req, int status) { SCliConn* pConn = req->data; if (status == 0) { @@ -378,10 +392,12 @@ static void clientWriteCb(uv_write_t* req, int status) { clientHandleExcept(pConn); return; } - uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb); + uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientRecvCb); } -static void clientWrite(SCliConn* pConn) { +static void clientSendData(SCliConn* pConn) { + CONN_HANDLE_BROKEN(pConn); + SCliMsg* pCliMsg = pConn->data; STransConnCtx* pCtx = pCliMsg->ctx; @@ -420,7 +436,11 @@ static void clientWrite(SCliConn* pConn) { TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); - uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); + uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientSendDataCb); + + return; +_RETURE: + return; } static void clientConnCb(uv_connect_t* req, int status) { // impl later @@ -439,7 +459,7 @@ static void clientConnCb(uv_connect_t* req, int status) { tTrace("%s client conn %p connect to server successfully", CONN_GET_INST_LABEL(pConn), pConn); assert(pConn->stream == req->handle); - clientWrite(pConn); + clientSendData(pConn); } static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { @@ -452,35 +472,34 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { pThrd->quit = true; uv_stop(pThrd->loop); } -static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { - uint64_t et = taosGetTimestampUs(); - uint64_t el = et - pMsg->st; - tTrace("%s client msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el); - - STransConnCtx* pCtx = pMsg->ctx; - SRpcInfo* pTransInst = pThrd->pTransInst; - SCliConn* conn = NULL; - +static SCliConn* clientGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { + SCliConn* conn = NULL; if (pMsg->msg.handle != NULL) { conn = (SCliConn*)(pMsg->msg.handle); + transUnrefCliHandle(conn); if (conn != NULL) { tTrace("%s client conn %p reused", CONN_GET_INST_LABEL(conn), conn); } } else { + STransConnCtx* pCtx = pMsg->ctx; conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); if (conn != NULL) tTrace("%s client conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); } + return conn; +} +static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { + uint64_t et = taosGetTimestampUs(); + uint64_t el = et - pMsg->st; + tTrace("%s client msg tran time cost: %" PRIu64 "us", ((SRpcInfo*)pThrd->pTransInst)->label, el); + + STransConnCtx* pCtx = pMsg->ctx; + SRpcInfo* pTransInst = pThrd->pTransInst; + SCliConn* conn = clientGetConn(pMsg, pThrd); if (conn != NULL) { conn->data = pMsg; - conn->writeReq.data = conn; transDestroyBuffer(&conn->readBuf); - - if (pThrd->quit) { - clientHandleExcept(conn); - return; - } - clientWrite(conn); + clientSendData(conn); } else { conn = clientConnCreate(pThrd); conn->data = pMsg; @@ -495,8 +514,6 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { tTrace("%s client conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); } - - conn->ctnRdCnt = 0; conn->hThrdIdx = pCtx->hThrdIdx; } static void clientAsyncCb(uv_async_t* handle) { diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 593d0acfc5..1abca9ad97 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -92,9 +92,9 @@ static int uvAuthMsg(SSrvConn* pConn, char* msg, int msgLen); static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); -static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void uvOnTimeoutCb(uv_timer_t* handle); -static void uvOnWriteCb(uv_write_t* req, int status); +static void uvOnSendCb(uv_write_t* req, int status); static void uvOnPipeWriteCb(uv_write_t* req, int status); static void uvOnAcceptCb(uv_stream_t* stream, int status); static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); @@ -240,7 +240,7 @@ static void uvHandleReq(SSrvConn* pConn) { // validate msg type } -void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { +void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { // opt SSrvConn* conn = cli->data; SConnBuffer* pBuf = &conn->readBuf; @@ -282,7 +282,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) { tError("server conn %p time out", pConn); } -void uvOnWriteCb(uv_write_t* req, int status) { +void uvOnSendCb(uv_write_t* req, int status) { SSrvConn* conn = req->data; transClearBuffer(&conn->readBuf); if (status == 0) { @@ -350,7 +350,7 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) { SSrvConn* pConn = smsg->pConn; uv_timer_stop(&pConn->pTimer); - uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb); + uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb); } static void uvStartSendResp(SSrvMsg* smsg) { // impl @@ -526,7 +526,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { return; } - uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); + uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnRecvCb); } else { tDebug("failed to create new connection"); @@ -641,7 +641,7 @@ static void uvDestroyConn(uv_handle_t* handle) { uv_timer_stop(&conn->pTimer); QUEUE_REMOVE(&conn->queue); free(conn->pTcp); - // free(conn); + free(conn); if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { uv_loop_close(thrd->loop); diff --git a/source/libs/transport/test/pushClient.c b/source/libs/transport/test/pushClient.c index f4babc9980..dc9914be35 100644 --- a/source/libs/transport/test/pushClient.c +++ b/source/libs/transport/test/pushClient.c @@ -82,7 +82,7 @@ static void *sendRequest(void *param) { rpcMsg.contLen = pInfo->msgSize; rpcMsg.ahandle = pInfo; rpcMsg.msgType = 1; - rpcMsg.push = push; + // rpcMsg.push = push; // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); int64_t start = taosGetTimestampUs(); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); -- GitLab