From 64d979fd5c652b3e9d3fdf412904ea40e3684a9c Mon Sep 17 00:00:00 2001 From: ubuntu Date: Sat, 12 Mar 2022 19:07:53 +0800 Subject: [PATCH] update transport --- include/libs/transport/trpc.h | 7 +++ source/libs/transport/inc/transportInt.h | 1 + source/libs/transport/src/transCli.c | 59 +++++++++++++++--------- source/libs/transport/src/transSrv.c | 2 +- 4 files changed, 47 insertions(+), 22 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index fdc9368b76..8dfd736df6 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -71,6 +71,10 @@ typedef struct SRpcInit { // call back to keep conn or not bool (*pfp)(void *parent, tmsg_t msgType); + // to support Send messages multiple times on a link + // + void* (*mfp)(void *parent, tmsg_t msgType); + void *parent; } SRpcInit; @@ -89,6 +93,9 @@ void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) int rpcReportProgress(void *pConn, char *pCont, int contLen); void rpcCancelRequest(int64_t rid); +// just release client conn to rpc instance, no close sock +void rpcReleaseHandle(void *handle); + void rpcRefHandle(void *handle, int8_t type); void rpcUnrefHandle(void *handle, int8_t type); diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 4e4dcf7aa4..3924a5cf1a 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -64,6 +64,7 @@ typedef struct { void (*cfp)(void* parent, SRpcMsg*, SEpSet*); int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); bool (*pfp)(void* parent, tmsg_t msgType); + void* (*mfp)(void* parent, tmsg_t msgType); int32_t refCount; void* parent; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 727845b7a9..ce3c1c2dc8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -135,13 +135,12 @@ static void destroyThrdObj(SCliThrdObj* pThrd); } \ } while (0); +#define CONN_PERSIST_BY_APP(conn) do { if (conn->persist == false) { conn->persist = true; transRefCliHandle(conn);}} while(0) +#define CONN_NO_PERSIST_BY_APP(conn) ((conn)->persist == false) + static void* cliWorkThread(void* arg); -static void* cliNotifyApp() {} static void cliHandleResp(SCliConn* conn) { - SCliMsg* pMsg = conn->data; - STransConnCtx* pCtx = pMsg->ctx; - SCliThrdObj* pThrd = conn->hostThrd; SRpcInfo* pTransInst = pThrd->pTransInst; @@ -157,14 +156,24 @@ static void cliHandleResp(SCliConn* conn) { rpcMsg.pCont = transContFromHead((char*)pHead); rpcMsg.code = pHead->code; rpcMsg.msgType = pHead->msgType; - rpcMsg.ahandle = pCtx->ahandle; + rpcMsg.ahandle = NULL; - if (pTransInst->pfp != NULL && (pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) { - rpcMsg.handle = conn; - transRefCliHandle(conn); + SCliMsg* pMsg = conn->data; + STransConnCtx *pCtx = pMsg ? pMsg->ctx : NULL; + if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(conn)) { + rpcMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, rpcMsg.msgType) : NULL; + } else { + rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL; + } + //if (rpcMsg.ahandle == NULL) { + // tDebug("%s cli conn %p handle except", CONN_GET_INST_LABEL(conn), conn); + // return; + //} - conn->persist = 1; - tDebug("cli conn %p persist by app", conn); + if (pTransInst->pfp != NULL && (*pTransInst->pfp)(pTransInst->parent, rpcMsg.msgType)) { + rpcMsg.handle = conn; + CONN_PERSIST_BY_APP(conn); + tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); } tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, @@ -173,7 +182,7 @@ static void cliHandleResp(SCliConn* conn) { conn->secured = pHead->secured; - if (pCtx->pSem == NULL) { + if (pCtx == NULL || pCtx->pSem == NULL) { tTrace("%s cli conn %p handle resp", pTransInst->label, conn); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); } else { @@ -184,8 +193,7 @@ static void cliHandleResp(SCliConn* conn) { uv_read_start((uv_stream_t*)conn->stream, cliAllocBufferCb, cliRecvCb); - // user owns conn->persist = 1 - if (conn->persist == 0) { + if (CONN_NO_PERSIST_BY_APP(conn)) { addConnToPool(pThrd->pool, pCtx->ip, pCtx->port, conn); } destroyCmsg(conn->data); @@ -198,22 +206,29 @@ static void cliHandleResp(SCliConn* conn) { } static void cliHandleExcept(SCliConn* pConn) { if (pConn->data == NULL) { - // handle conn except in conn pool - transUnrefCliHandle(pConn); - return; + if (pConn->broken == true || CONN_NO_PERSIST_BY_APP(pConn)) { + transUnrefCliHandle(pConn); + return; + } } SCliThrdObj* pThrd = pConn->hostThrd; SRpcInfo* pTransInst = pThrd->pTransInst; SCliMsg* pMsg = pConn->data; - STransConnCtx* pCtx = pMsg->ctx; + STransConnCtx *pCtx = pMsg ? pMsg->ctx : NULL; SRpcMsg rpcMsg = {0}; - rpcMsg.ahandle = pCtx->ahandle; rpcMsg.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - rpcMsg.msgType = pMsg->msg.msgType + 1; + rpcMsg.msgType = pMsg ? pMsg->msg.msgType + 1 : 0; + rpcMsg.ahandle = NULL; + + if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { + rpcMsg.ahandle = pTransInst->mfp ? (*pTransInst->mfp)(pTransInst->parent, rpcMsg.msgType) : NULL; + } else { + rpcMsg.ahandle = pCtx ? pCtx->ahandle : NULL; + } - if (pCtx->pSem == NULL) { + if (pCtx == NULL || pCtx->pSem == NULL) { tTrace("%s cli conn %p handle resp", pTransInst->label, pConn); (pTransInst->cfp)(pTransInst->parent, &rpcMsg, NULL); } else { @@ -358,6 +373,7 @@ static SCliConn* cliCreateConn(SCliThrdObj* pThrd) { QUEUE_INIT(&conn->conn); conn->hostThrd = pThrd; + conn->persist = false; conn->broken = false; transRefCliHandle(conn); return conn; @@ -476,7 +492,6 @@ static SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = NULL; if (pMsg->msg.handle != NULL) { conn = (SCliConn*)(pMsg->msg.handle); - transUnrefCliHandle(conn); if (conn != NULL) { tTrace("%s cli conn %p reused", CONN_GET_INST_LABEL(conn), conn); } @@ -514,6 +529,7 @@ static void cliHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { tTrace("%s cli conn %p try to connect to %s:%d", pTransInst->label, conn, pMsg->ctx->ip, pMsg->ctx->port); uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); } + conn->hThrdIdx = pCtx->hThrdIdx; } static void cliAsyncCb(uv_async_t* handle) { @@ -683,6 +699,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* if (transCompressMsg(pMsg->pCont, pMsg->contLen, &flen)) { // imp later } + tDebug("send request at thread:%d %p", index, pMsg); STransConnCtx* pCtx = calloc(1, sizeof(STransConnCtx)); pCtx->ahandle = pMsg->ahandle; pCtx->msgType = pMsg->msgType; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 1abca9ad97..c4c3d9ed0a 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -260,7 +260,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } tError("server conn %p read error: %s", conn, uv_err_name(nread)); - if (nread < 0 || nread == UV_EOF) { + if (nread < 0) { conn->broken = true; transUnrefSrvHandle(conn); -- GitLab