From cc5ae34d6e7d3fe6158468f14263f2cc04dcdd0b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 16 Feb 2022 14:41:05 +0800 Subject: [PATCH] fix conn persist bug --- source/libs/transport/inc/transComm.h | 1 + source/libs/transport/src/transCli.c | 23 +++++++++-------------- source/libs/transport/test/rclient.c | 2 +- 3 files changed, 11 insertions(+), 15 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 6f8da57ee7..d5a8cf5f84 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -140,6 +140,7 @@ typedef struct { SRpcMsg* pRsp; // for synchronous API tsem_t* pSem; // for synchronous API + int hThrdIdx; char* ip; uint32_t port; // SEpSet* pSet; // for synchronous API diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1e96459166..9cab863ed7 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -17,7 +17,7 @@ #include "transComm.h" -#define CONN_HOST_THREAD_INDEX(conn ? ((SCliConn*)conn)->hThrdIdx : -1) +#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_PERSIST_TIME(para) (para * 1000 * 10) typedef struct SCliConn { @@ -29,8 +29,8 @@ typedef struct SCliConn { void* data; queue conn; uint64_t expireTime; - int8_t ctnRdCnt; // timers already notify to client - int hostThreadIndex; + int8_t ctnRdCnt; // continue read count + int hThrdIdx; SRpcPush* push; int persist; // @@ -482,12 +482,12 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { if (pMsg->msg.handle == NULL) { conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); if (conn != NULL) { - tTrace("client conn %d get from conn pool", conn); + tTrace("client conn %p get from conn pool", conn); } } else { conn = (SCliConn*)(pMsg->msg.handle); if (conn != NULL) { - tTrace("client conn %d reused", conn); + tTrace("client conn %p reused", conn); } } @@ -503,7 +503,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { clientWrite(conn); } else { - SCliConn* conn = calloc(1, sizeof(SCliConn)); + conn = calloc(1, sizeof(SCliConn)); conn->ref++; // read/write stream handle conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); @@ -652,15 +652,9 @@ static void clientSendQuit(SCliThrdObj* thrd) { SCliMsg* msg = calloc(1, sizeof(SCliMsg)); msg->ctx = NULL; // - // pthread_mutex_lock(&thrd->msgMtx); - // QUEUE_PUSH(&thrd->msg, &msg->q); - // pthread_mutex_unlock(&thrd->msgMtx); - transSendAsync(thrd->asyncPool, &msg->q); - // uv_async_send(thrd->cliAsync); } void taosCloseClient(void* arg) { - // impl later SClientObj* cli = arg; for (int i = 0; i < cli->numOfThreads; i++) { clientSendQuit(cli->pThreadObj[i]); @@ -683,7 +677,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* SRpcInfo* pRpc = (SRpcInfo*)shandle; - int index = CONN_HOST_THREAD_INDEX(pMsg.handle); + int index = CONN_HOST_THREAD_INDEX(pMsg->handle); if (index == -1) { index = clientRBChoseIdx(pRpc); } @@ -717,7 +711,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { SRpcInfo* pRpc = (SRpcInfo*)shandle; - int index = CONN_HOST_THREAD_INDEX(pMsg.handle); + int index = CONN_HOST_THREAD_INDEX(pReq->handle); if (index == -1) { index = clientRBChoseIdx(pRpc); } @@ -728,6 +722,7 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { pCtx->msgType = pReq->msgType; pCtx->ip = strdup(ip); pCtx->port = port; + pCtx->hThrdIdx = index; pCtx->pSem = calloc(1, sizeof(tsem_t)); pCtx->pRsp = pRsp; tsem_init(pCtx->pSem, 0, 0); diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index 4e29c02508..cc6a63d3cd 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -96,7 +96,7 @@ static void *sendRequest(void *param) { int main(int argc, char *argv[]) { SRpcInit rpcInit; - SEpSet epSet; + SEpSet epSet = {0}; int msgSize = 128; int numOfReqs = 0; int appThreads = 1; -- GitLab