diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 842b5a1b4b7839ce3edcf63e8cb2cd9fc9fc445a..46658e935283b754677eeab4fff17fc184ed6681 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -138,11 +138,12 @@ static void destroyThrdObj(SCliThrdObj* pThrd); conn->status = ConnRelease; \ transClearBuffer(&conn->readBuf); \ transFreeMsg(transContFromHead((char*)head)); \ + tDebug("cli conn %p receive release request", conn); \ if (T_REF_VAL_GET(conn) == 1) { \ SCliThrdObj* thrd = conn->hostThrd; \ addConnToPool(thrd->pool, conn); \ } \ - goto _RETURN; \ + return; \ } \ } while (0) @@ -150,7 +151,7 @@ static void destroyThrdObj(SCliThrdObj* pThrd); do { \ if (thrd->quit) { \ cliHandleExcept(conn); \ - goto _RETURE; \ + return; \ } \ } while (0) @@ -158,9 +159,9 @@ static void destroyThrdObj(SCliThrdObj* pThrd); do { \ if (conn->broken) { \ cliHandleExcept(conn); \ - goto _RETURE; \ + return; \ } \ - } while (0); + } while (0) #define CONN_SET_PERSIST_BY_APP(conn) \ do { \ @@ -388,13 +389,13 @@ static void addConnToPool(void* pool, SCliConn* conn) { conn->status = ConnNormal; // list already create before assert(plist != NULL); + taosArrayClear(conn->cliMsgs); QUEUE_PUSH(&plist->conn, &conn->conn); + assert(!QUEUE_IS_EMPTY(&plist->conn)); } static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; - // avoid conn - QUEUE_REMOVE(&conn->conn); transAllocBuffer(pBuf, buf); } static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { @@ -420,6 +421,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { // ref http://docs.libuv.org/en/v1.x/stream.html?highlight=uv_read_start#c.uv_read_cb // nread might be 0, which does not indicate an error or EOF. This is equivalent to EAGAIN or EWOULDBLOCK under // read(2). + tTrace("%s cli conn %p read empty", CONN_GET_INST_LABEL(conn), conn); return; } if (nread < 0) { @@ -555,8 +557,6 @@ void cliSend(SCliConn* pConn) { pConn->writeReq.data = pConn; uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); - return; -_RETURE: return; } @@ -594,6 +594,7 @@ static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = pMsg->msg.handle; tDebug("%s cli conn %p start to release to inst", CONN_GET_INST_LABEL(conn), conn); + transUnrefCliHandle(conn); taosArrayPush(conn->cliMsgs, &pMsg); if (taosArrayGetSize(conn->cliMsgs) >= 2) { return; // send one by one @@ -613,6 +614,8 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrdObj* pThrd) { conn = getConnFromPool(pThrd->pool, pCtx->ip, pCtx->port); if (conn != NULL) { tTrace("%s cli conn %p get from conn pool", CONN_GET_INST_LABEL(conn), conn); + } else { + tTrace("not found conn in conn pool %p", pThrd->pool); } } return conn; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 108b12542c1b89f5672f60f5242d8f98cd6baf70..063088aeb48ccd7231fcf1c316c60e2a44198275 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -99,7 +99,19 @@ static const char* notify = "a"; conn->status = ConnRelease; \ transClearBuffer(&conn->readBuf); \ transFreeMsg(transContFromHead((char*)head)); \ - goto _RETURE; \ + tTrace("server conn %p received release request", conn); \ + \ + STransMsg tmsg = {.handle = (void*)conn, .code = 0}; \ + SSrvMsg* srvMsg = calloc(1, sizeof(SSrvMsg)); \ + srvMsg->msg = tmsg; \ + srvMsg->type = Release; \ + srvMsg->pConn = conn; \ + taosArrayPush(conn->srvMsgs, &srvMsg); \ + if (taosArrayGetSize(conn->srvMsgs) > 1) { \ + return; \ + } \ + uvStartSendRespInternal(srvMsg); \ + return; \ } \ } while (0) // refactor later @@ -242,6 +254,7 @@ static void uvHandleReq(SSrvConn* pConn) { pHead->msgLen -= sizeof(STransUserMsg); } } + CONN_SHOULD_RELEASE(pConn, pHead); STransMsg transMsg; @@ -280,8 +293,6 @@ static void uvHandleReq(SSrvConn* pConn) { (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth -_RETURE: - return; } void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { @@ -798,11 +809,10 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { // release handle to rpc init SSrvConn* conn = msg->pConn; if (conn->status == ConnAcquire) { - if (taosArrayGetSize(conn->srvMsgs) > 0) { - taosArrayPush(conn->srvMsgs, &msg); + taosArrayPush(conn->srvMsgs, &msg); + if (taosArrayGetSize(conn->srvMsgs) > 1) { return; } - taosArrayPush(conn->srvMsgs, &msg); uvStartSendRespInternal(msg); return; } else if (conn->status == ConnRelease) { diff --git a/source/libs/transport/test/transUT.cc b/source/libs/transport/test/transUT.cc index deccd633d8a0e929a0a72fbfdaf07170814a6d7e..69a645e0a77bf2a76c700e67d9185d19a5c79e4e 100644 --- a/source/libs/transport/test/transUT.cc +++ b/source/libs/transport/test/transUT.cc @@ -30,24 +30,8 @@ const char *ckey = "ckey"; class Server; int port = 7000; // server process - -typedef struct CbArgs { - tmsg_t msgType; -} CbArgs; - -static void *ConstructArgForSpecificMsgType(void *parent, tmsg_t msgType) { - if (msgType == 1 || msgType == 2) { - CbArgs *args = (CbArgs *)calloc(1, sizeof(CbArgs)); - args->msgType = msgType; - return args; - } - return NULL; -} // server except -static bool handleExcept(void *parent, tmsg_t msgType) { - // - return msgType == TDMT_VND_QUERY || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP; -} + typedef void (*CB)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet); @@ -86,10 +70,6 @@ class Client { rpcClose(this->transCli); this->transCli = NULL; } - void SetConstructFP(void *(*mfp)(void *parent, tmsg_t msgType)) { - rpcClose(this->transCli); - this->transCli = rpcOpen(&rpcInit_); - } void SendAndRecv(SRpcMsg *req, SRpcMsg *resp) { SEpSet epSet = {0}; @@ -108,7 +88,6 @@ class Client { SendAndRecv(req, resp); } - void SendWithHandle(SRpcMsg *req, SRpcMsg *resp) {} void SemWait() { tsem_wait(&this->sem); } void SemPost() { tsem_post(&this->sem); } void Reset() {} @@ -141,12 +120,17 @@ class Server { this->transSrv = rpcOpen(&this->rpcInit_); taosMsleep(1000); } + void SetSrvContinueSend(CB cb) { + this->Stop(); + rpcInit_.cfp = cb; + this->Start(); + } void Stop() { if (this->transSrv == NULL) return; rpcClose(this->transSrv); this->transSrv = NULL; } - void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { + void SetSrvSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { this->Stop(); rpcInit_.cfp = cfp; this->Start(); @@ -174,9 +158,6 @@ static void processReq(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { } static void processContinueSend(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { - for (int i = 0; i < 9; i++) { - rpcRefHandle(pMsg->handle, TAOS_CONN_SERVER); - } for (int i = 0; i < 10; i++) { SRpcMsg rpcMsg = {0}; rpcMsg.pCont = rpcMallocCont(100); @@ -238,10 +219,6 @@ class TransObj { // srv->Stop(); } - void SetCliMFp(void *(*mfp)(void *parent, tmsg_t msgType)) { - // do nothing - cli->SetConstructFP(mfp); - } // call when link broken, and notify query or fetch stop void SetSrvContinueSend(void (*cfp)(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet)) { /////// @@ -279,7 +256,7 @@ class TransEnv : public ::testing::Test { }; TEST_F(TransEnv, 01sendAndRec) { - for (int i = 0; i < 1; i++) { + for (int i = 0; i < 10; i++) { SRpcMsg req = {0}, resp = {0}; req.msgType = 0; req.pCont = rpcMallocCont(10); @@ -322,22 +299,33 @@ TEST_F(TransEnv, clientUserDefined) { } TEST_F(TransEnv, cliPersistHandle) { - // tr->SetCliPersistFp(cliPersistHandle); SRpcMsg resp = {0}; + void * handle = NULL; for (int i = 0; i < 10; i++) { SRpcMsg req = {.handle = resp.handle, .persistHandle = 1}; req.msgType = 1; req.pCont = rpcMallocCont(10); req.contLen = 10; tr->cliSendAndRecv(&req, &resp); - if (i == 5) { - std::cout << "stop server" << std::endl; - tr->StopSrv(); - } - if (i >= 6) { - EXPECT_TRUE(resp.code != 0); - } + // if (i == 5) { + // std::cout << "stop server" << std::endl; + // tr->StopSrv(); + //} + // if (i >= 6) { + // EXPECT_TRUE(resp.code != 0); + //} + handle = resp.handle; } + rpcReleaseHandle(handle, TAOS_CONN_CLIENT); + for (int i = 0; i < 10; i++) { + SRpcMsg req = {0}; + req.msgType = 1; + req.pCont = rpcMallocCont(10); + req.contLen = 10; + tr->cliSendAndRecv(&req, &resp); + } + + taosMsleep(1000); ////////////////// } @@ -425,11 +413,7 @@ TEST_F(TransEnv, cliPersistHandleExcept) { TEST_F(TransEnv, multiCliPersistHandleExcept) { // conn broken } -TEST_F(TransEnv, queryExcept) { - // tr->SetSrvExceptFp(handleExcept); - - // query and conn is broken -} +TEST_F(TransEnv, queryExcept) {} TEST_F(TransEnv, noResp) { SRpcMsg resp = {0}; for (int i = 0; i < 5; i++) {