diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 3bd764ff8d26962a0089573c3a5822ef37185eac..f106e07e37f37fff4da50d7fbaf346ef6664e296 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -159,11 +159,9 @@ static void cliIdleCb(uv_idle_t* handle); static void cliPrepareCb(uv_prepare_t* handle); static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd); -// static void cliConnBatchCb(uv_connect_t* req, int status); static void cliSendBatchCb(uv_write_t* req, int status); -// static void cliConnBatchCb(uv_connect_t* req, int status); -// callback after conn to server -// static void cliConnBatchCb(uv_connect_t* req, int status); + +SCliBatch* cliGetHeadFromList(SCliBatchList* pList); static bool cliRecvReleaseReq(SCliConn* conn, STransMsgHead* pHead); @@ -847,8 +845,11 @@ void cliSendBatch(SCliConn* pConn) { SCliThrd* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; - SCliBatch* pBatch = pConn->pBatch; - int32_t wLen = pBatch->wLen; + SCliBatch* pBatch = pConn->pBatch; + SCliBatchList* pList = pBatch->pList; + pList->connCnt += 1; + + int32_t wLen = pBatch->wLen; uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t)); int i = 0; @@ -994,7 +995,7 @@ static void cliDestroyBatch(SCliBatch* pBatch) { taosMemoryFree(pBatch); } static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { - if (pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) { + if (pBatch == NULL || pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) { return; } STrans* pTransInst = pThrd->pTransInst; @@ -1071,8 +1072,7 @@ static void cliSendBatchCb(uv_write_t* req, int status) { SCliBatch* p = conn->pBatch; SCliBatchList* pBatchList = p->pList; - - int32_t empty = QUEUE_IS_EMPTY(&pBatchList->wq); + SCliBatch* nxtBatch = cliGetHeadFromList(pBatchList); pBatchList->connCnt -= 1; conn->pBatch = NULL; @@ -1081,23 +1081,17 @@ static void cliSendBatchCb(uv_write_t* req, int status) { tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, p->wLen, p->batchSize, uv_err_name(status)); cliHandleExcept(conn); - + cliHandleBatchReq(nxtBatch, thrd); } else { tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen, p->batchSize); - if (empty == false) { - queue* h = QUEUE_HEAD(&pBatchList->wq); - QUEUE_REMOVE(h); - conn->pBatch = QUEUE_DATA(h, SCliBatch, listq); - - pBatchList->connCnt += 1; - pBatchList->len -= 1; + if (nxtBatch != NULL) { + conn->pBatch = nxtBatch; cliSendBatch(conn); - return; + } else { + addConnToPool(thrd->pool, conn); } - - addConnToPool(thrd->pool, conn); } cliDestroyBatch(p); @@ -1466,6 +1460,18 @@ static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) { tTrace("cli process batch size:%d", count); } } +SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { + if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt >= pList->connMax) { + return NULL; + } + queue* hr = QUEUE_HEAD(&pList->wq); + QUEUE_REMOVE(hr); + + pList->len -= 1; + + SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq); + return batch; +} static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { int count = 0; @@ -1528,6 +1534,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) { QUEUE_PUSH(&pBatch->wq, h); pBatch->batchSize += pMsg->msg.contLen; + pBatch->wLen += 1; } else { SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); QUEUE_INIT(&pBatch->wq); @@ -1551,17 +1558,10 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { void** pIter = taosHashIterate(pThrd->batchCache, NULL); while (pIter != NULL) { SCliBatchList* batchList = (SCliBatchList*)(*pIter); - if (QUEUE_IS_EMPTY(&batchList->wq) || batchList->connCnt >= batchList->connMax) { - continue; + SCliBatch* batch = cliGetHeadFromList(batchList); + if (batch != NULL) { + cliHandleBatchReq(batch, pThrd); } - queue* hr = QUEUE_HEAD(&batchList->wq); - QUEUE_REMOVE(hr); - - batchList->len -= 1; - - SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq); - - cliHandleBatchReq(batch, pThrd); pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); } @@ -1848,7 +1848,10 @@ static void destroyThrdObj(SCliThrd* pThrd) { SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq); cliDestroyBatch(pBatch); } + taosMemoryFree(pBatchList->ip); + taosMemoryFree(pBatchList->dst); taosMemoryFree(pBatchList); + pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); } taosHashCleanup(pThrd->batchCache);