From ee28ab3bf7271aee41c634d9ab44714c276df8fe Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 16 Feb 2023 18:06:12 +0800 Subject: [PATCH] enh: batch send --- source/libs/transport/src/transCli.c | 29 +++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 54203e8d7c..47845b6336 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -837,10 +837,9 @@ void cliSendBatch(SCliConn* pConn) { uv_buf_t* wb = taosMemoryCalloc(wLen, sizeof(uv_buf_t)); int i = 0; - while (!QUEUE_IS_EMPTY(&pBatch->wq)) { - queue* h = QUEUE_HEAD(&pBatch->wq); + queue* h = NULL; + QUEUE_FOREACH(h, &pBatch->wq) { SCliMsg* pCliMsg = QUEUE_DATA(h, SCliMsg, q); - QUEUE_REMOVE(&pCliMsg->q); STransConnCtx* pCtx = pCliMsg->ctx; @@ -878,7 +877,6 @@ void cliSendBatch(SCliConn* pConn) { wb[i++] = uv_buf_init((char*)pHead, msgLen); } - pBatch->wLen = 0; uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); req->data = pConn; uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb); @@ -970,7 +968,13 @@ _RETURN: static SCliBatch* cliDumpBatch(SCliBatch* pBatch) { SCliBatch* pNewBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - memcpy(pNewBatch->wq, pBatch->wq, sizeof(pBatch->wq)); + + QUEUE_INIT(&pNewBatch->wq); + while (!QUEUE_IS_EMPTY(&pBatch->wq)) { + queue* h = QUEUE_HEAD(&pBatch->wq); + QUEUE_REMOVE(h); + QUEUE_PUSH(&pNewBatch->wq, h); + } pNewBatch->batchSize = pBatch->batchSize; pNewBatch->batch = pBatch->batch; @@ -1027,7 +1031,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { addr.sin_addr.s_addr = ipaddr; addr.sin_port = (uint16_t)htons(pBatch->port); - tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pBatch->ip); + tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pBatch->dst); int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4); if (fd == -1) { tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, @@ -1079,7 +1083,7 @@ static void cliSendBatchCb(uv_write_t* req, int status) { static void cliHandleFastFail(SCliConn* pConn, int status) { SCliThrd* pThrd = pConn->hostThrd; STrans* pTransInst = pThrd->pTransInst; - tError("conn %p free twice", pConn); + tError("conn %p free twice, reason:%s", pConn, uv_err_name(status)); if (pConn->pBatch == NULL) { SCliMsg* pMsg = transQueueGet(&pConn->cliMsgs, 0); @@ -1443,7 +1447,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); - if (REQUEST_NO_RESP(&pMsg->msg)) { + if (pMsg->type == Normal && REQUEST_NO_RESP(&pMsg->msg)) { STransConnCtx* pCtx = pMsg->ctx; char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); @@ -1469,7 +1473,7 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { (*ppBatch)->wLen += 1; (*ppBatch)->batchSize += pMsg->msg.contLen; } - return; + continue; } (*cliAsyncHandle[pMsg->type])(pMsg, pThrd); count++; @@ -1751,6 +1755,13 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosHashCleanup(pThrd->fqdn2ipCache); taosHashCleanup(pThrd->failFastCache); taosHashCleanup(pThrd->connLimitCache); + + void** pIter = taosHashIterate(pThrd->batchCache, NULL); + while (pIter != NULL) { + SCliBatch* batch = (SCliBatch*)(*pIter); + cliDestroyBatch(batch); + pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); + } taosHashCleanup(pThrd->batchCache); taosMemoryFree(pThrd); } -- GitLab