diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 1a3e70e6e07d6c2a721eee07dfe4844ad1443c45..91f9a8ead209697618745e89b5b442b2f20d9c36 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -30,7 +30,8 @@ void* rpcOpen(const SRpcInit* pInit) { tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); } pRpc->cfp = pInit->cfp; - pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; + // pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; + pRpc->numOfThreads = pInit->numOfThreads; pRpc->connType = pInit->connType; pRpc->idleTime = pInit->idleTime; pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 1320e6e3ba5dcf109193d154f6586b50de072834..4ffaad3f84a5fab444aa0d72fe4c160019b68375 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -139,18 +139,16 @@ static void clientHandleResp(SCliConn* conn) { static void clientHandleExcept(SCliConn* pConn) { if (pConn->data == NULL) { clientConnDestroy(pConn, true); - return; } tDebug("conn %p destroy", pConn); SCliMsg* pMsg = pConn->data; + transFreeMsg((pMsg->msg.pCont)); + pMsg->msg.pCont = NULL; STransConnCtx* pCtx = pMsg->ctx; SRpcInfo* pRpc = pCtx->pTransInst; - transFreeMsg((pMsg->msg.pCont)); - pMsg->msg.pCont = NULL; - SRpcMsg rpcMsg = {0}; rpcMsg.ahandle = pCtx->ahandle; rpcMsg.code = -1; @@ -254,7 +252,8 @@ static bool clientReadComplete(SConnBuffer* data) { if (msgLen > data->len) { data->left = msgLen - data->len; return false; - } else { + } else if (msgLen == data->len) { + data->left = 0; return true; } } else { @@ -321,19 +320,19 @@ static void clientWriteCb(uv_write_t* req, int status) { SCliConn* pConn = req->data; if (status == 0) { tDebug("conn %p data already was written out", pConn); + SCliMsg* pMsg = pConn->data; + if (pMsg != NULL) { + transFreeMsg((pMsg->msg.pCont)); + pMsg->msg.pCont = NULL; + } + } else { tError("conn %p failed to write: %s", pConn, uv_err_name(status)); clientHandleExcept(pConn); return; } SCliThrdObj* pThrd = pConn->hostThrd; - // if (pConn->stream == NULL) { - // pConn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); - // uv_tcp_init(pThrd->loop, (uv_tcp_t*)pConn->stream); - // pConn->stream->data = pConn; - //} uv_read_start((uv_stream_t*)pConn->stream, clientAllocBufferCb, clientReadCb); - // impl later } static void clientWrite(SCliConn* pConn) { @@ -378,7 +377,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { tDebug("conn %p get from conn pool", conn); conn->data = pMsg; conn->writeReq->data = conn; - // transDestroyBuffer(&conn->readBuf); + transDestroyBuffer(&conn->readBuf); clientWrite(conn); } else { SCliConn* conn = calloc(1, sizeof(SCliConn)); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 8aa3995651b8a932b81e7cafaea820240f9402fa..485ed904e83f2000a9b9a0d552a0022e3babb004 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -306,6 +306,7 @@ void uvOnTimeoutCb(uv_timer_t* handle) { void uvOnWriteCb(uv_write_t* req, int status) { SSrvConn* conn = req->data; SSrvMsg* smsg = conn->pSrvMsg; + destroySrvMsg(conn); transClearBuffer(&conn->readBuf); if (status == 0) {