From 597d7d3d9b4f8e2676e2ba7adafbbee4333854a3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 17 Feb 2023 20:47:38 +0800 Subject: [PATCH] opt transport --- source/libs/transport/src/transCli.c | 194 ++++++++++++++++++--------- 1 file changed, 133 insertions(+), 61 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e4fa91170d..3bd764ff8d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -21,14 +21,28 @@ typedef struct SConnList { } SConnList; typedef struct { - queue wq; - int32_t wLen; - int32_t batchSize; // - int32_t batch; + queue wq; + int32_t len; + + int connMax; + int connCnt; + int batchLenLimit; + char* dst; char* ip; uint16_t port; + +} SCliBatchList; + +typedef struct { + queue wq; + queue listq; + int32_t wLen; + int32_t batchSize; // + int32_t batch; + SCliBatchList* pList; } SCliBatch; + typedef struct SCliConn { T_REF_DECLARE() uv_connect_t connReq; @@ -866,14 +880,21 @@ void cliSendBatch(SCliConn* pConn) { pHead->magicNum = htonl(TRANS_MAGIC_NUM); } pHead->timestamp = taosHton64(taosGetTimestampUs()); - msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); + if (pHead->comp == 0) { + if (pTransInst->compressSize != -1 && pTransInst->compressSize < pMsg->contLen) { + msgLen = transCompressMsg(pMsg->pCont, pMsg->contLen) + sizeof(STransMsgHead); + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + } + } else { + msgLen = (int32_t)ntohl((uint32_t)(pHead->msgLen)); + } wb[i++] = uv_buf_init((char*)pHead, msgLen); } uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); req->data = pConn; - tDebug("%p conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn, + tDebug("%s conn %p start to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(pConn), pConn, pBatch->wLen, pBatch->batchSize); uv_write(req, (uv_stream_t*)pConn->stream, wb, wLen, cliSendBatchCb); taosMemoryFree(wb); @@ -962,62 +983,37 @@ _RETURN: return; } -static SCliBatch* cliDumpBatch(SCliBatch* pBatch) { - SCliBatch* pNewBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); - - QUEUE_INIT(&pNewBatch->wq); - QUEUE_MOVE(&pBatch->wq, &pNewBatch->wq); - - pNewBatch->batchSize = pBatch->batchSize; - pNewBatch->batch = pBatch->batch; - pNewBatch->wLen = pBatch->wLen; - pNewBatch->dst = strdup(pBatch->dst); - pNewBatch->ip = strdup(pBatch->ip); - pNewBatch->port = pBatch->port; - - QUEUE_INIT(&pBatch->wq); - pBatch->batchSize = 0; - pBatch->batch = 0; - pBatch->wLen = 0; - - return pNewBatch; -} static void cliDestroyBatch(SCliBatch* pBatch) { while (!QUEUE_IS_EMPTY(&pBatch->wq)) { - queue* h = QUEUE_HEAD(&pBatch->wq); - SCliMsg* p = QUEUE_DATA(h, SCliMsg, q); + queue* h = QUEUE_HEAD(&pBatch->wq); + QUEUE_REMOVE(h); - QUEUE_REMOVE(&p->q); + SCliMsg* p = QUEUE_DATA(h, SCliMsg, q); destroyCmsg(p); } - taosMemoryFree(pBatch->ip); - taosMemoryFree(pBatch->dst); taosMemoryFree(pBatch); } static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { if (pBatch->wLen == 0 || QUEUE_IS_EMPTY(&pBatch->wq)) { return; } - STrans* pTransInst = pThrd->pTransInst; - - SCliBatch* pNewBatch = cliDumpBatch(pBatch); + STrans* pTransInst = pThrd->pTransInst; + SCliBatchList* pList = pBatch->pList; - SCliConn* conn = getConnFromPool(pThrd->pool, pNewBatch->ip, pNewBatch->port); + SCliConn* conn = getConnFromPool(pThrd->pool, pList->ip, pList->port); - if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, pNewBatch->ip, pNewBatch->port)) { - tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pNewBatch->wLen, - pNewBatch->batchSize); - cliDestroyBatch(pNewBatch); + if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, pList->ip, pList->port)) { + tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pBatch->wLen, + pBatch->batchSize); + cliDestroyBatch(pBatch); return; } if (conn == NULL) { conn = cliCreateConn(pThrd); - conn->pBatch = pNewBatch; - conn->ip = strdup(pNewBatch->dst); + conn->pBatch = pBatch; + conn->ip = strdup(pList->dst); - char* ip = pNewBatch->ip; - uint16_t port = pNewBatch->port; - uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, ip); + uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, pList->ip); if (ipaddr == 0xffffffff) { uv_timer_stop(conn->timer); conn->timer->data = NULL; @@ -1030,9 +1026,9 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_addr.s_addr = ipaddr; - addr.sin_port = (uint16_t)htons(port); + addr.sin_port = (uint16_t)htons(pList->port); - tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pBatch->dst); + tTrace("%s conn %p try to connect to %s", pTransInst->label, conn, pList->dst); int32_t fd = taosCreateSocketWithTimeout(TRANS_CONN_TIMEOUT * 4); if (fd == -1) { tError("%s conn %p failed to create socket, reason:%s", transLabel(pTransInst), conn, @@ -1066,7 +1062,7 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { return; } - conn->pBatch = pNewBatch; + conn->pBatch = pBatch; cliSendBatch(conn); } static void cliSendBatchCb(uv_write_t* req, int status) { @@ -1074,15 +1070,33 @@ static void cliSendBatchCb(uv_write_t* req, int status) { SCliThrd* thrd = conn->hostThrd; SCliBatch* p = conn->pBatch; + SCliBatchList* pBatchList = p->pList; + + int32_t empty = QUEUE_IS_EMPTY(&pBatchList->wq); + pBatchList->connCnt -= 1; + conn->pBatch = NULL; if (status != 0) { - tDebug("%p conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, + tDebug("%s conn %p failed to send batch msg, batch size:%d, msgLen:%d, reason:%s", CONN_GET_INST_LABEL(conn), conn, p->wLen, p->batchSize, uv_err_name(status)); cliHandleExcept(conn); + } else { - tDebug("%p conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen, + tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen, p->batchSize); + + if (empty == false) { + queue* h = QUEUE_HEAD(&pBatchList->wq); + QUEUE_REMOVE(h); + conn->pBatch = QUEUE_DATA(h, SCliBatch, listq); + + pBatchList->connCnt += 1; + pBatchList->len -= 1; + cliSendBatch(conn); + return; + } + addConnToPool(thrd->pool, conn); } @@ -1468,23 +1482,65 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { char key[TSDB_FQDN_LEN + 64] = {0}; CONN_CONSTRUCT_HASH_KEY(key, ip, port); - SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key)); - if (ppBatch == NULL || *ppBatch == NULL) { + // SCliBatch** ppBatch = taosHashGet(pThrd->batchCache, key, sizeof(key)); + SCliBatchList** ppBatchList = taosHashGet(pThrd->batchCache, key, sizeof(key)); + if (ppBatchList == NULL || *ppBatchList == NULL) { + SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); + QUEUE_INIT(&pBatchList->wq); + pBatchList->connMax = 200; + pBatchList->connCnt = 0; + pBatchList->batchLenLimit = 16 * 1024; + pBatchList->ip = strdup(ip); + pBatchList->dst = strdup(key); + pBatchList->port = port; + SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); QUEUE_INIT(&pBatch->wq); + QUEUE_INIT(&pBatch->listq); + QUEUE_PUSH(&pBatch->wq, h); pBatch->wLen += 1; pBatch->batchSize += pMsg->msg.contLen; + pBatch->pList = pBatchList; - pBatch->dst = strdup(key); - pBatch->ip = strdup(ip); - pBatch->port = (uint16_t)port; + QUEUE_PUSH(&pBatchList->wq, &pBatch->listq); - taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatch, sizeof(void*)); + taosHashPut(pThrd->batchCache, key, sizeof(key), &pBatchList, sizeof(void*)); } else { - QUEUE_PUSH(&(*ppBatch)->wq, h); - (*ppBatch)->wLen += 1; - (*ppBatch)->batchSize += pMsg->msg.contLen; + if (QUEUE_IS_EMPTY(&(*ppBatchList)->wq)) { + SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); + QUEUE_INIT(&pBatch->wq); + QUEUE_INIT(&pBatch->listq); + + QUEUE_PUSH(&pBatch->wq, h); + pBatch->wLen += 1; + pBatch->batchSize = pMsg->msg.contLen; + pBatch->pList = *ppBatchList; + + QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq); + (*ppBatchList)->len += 1; + + continue; + } + + queue* hdr = QUEUE_TAIL(&((*ppBatchList)->wq)); + SCliBatch* pBatch = QUEUE_DATA(hdr, SCliBatch, listq); + if ((pBatch->batchSize + pMsg->msg.contLen) < (*ppBatchList)->batchLenLimit) { + QUEUE_PUSH(&pBatch->wq, h); + pBatch->batchSize += pMsg->msg.contLen; + } else { + SCliBatch* pBatch = taosMemoryCalloc(1, sizeof(SCliBatch)); + QUEUE_INIT(&pBatch->wq); + QUEUE_INIT(&pBatch->listq); + + QUEUE_PUSH(&pBatch->wq, h); + pBatch->wLen += 1; + pBatch->batchSize += pMsg->msg.contLen; + pBatch->pList = *ppBatchList; + + QUEUE_PUSH(&((*ppBatchList)->wq), &pBatch->listq); + (*ppBatchList)->len += 1; + } } continue; } @@ -1494,7 +1550,16 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { void** pIter = taosHashIterate(pThrd->batchCache, NULL); while (pIter != NULL) { - SCliBatch* batch = (SCliBatch*)(*pIter); + SCliBatchList* batchList = (SCliBatchList*)(*pIter); + if (QUEUE_IS_EMPTY(&batchList->wq) || batchList->connCnt >= batchList->connMax) { + continue; + } + queue* hr = QUEUE_HEAD(&batchList->wq); + QUEUE_REMOVE(hr); + + batchList->len -= 1; + + SCliBatch* batch = QUEUE_DATA(hr, SCliBatch, listq); cliHandleBatchReq(batch, pThrd); pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); @@ -1775,8 +1840,15 @@ static void destroyThrdObj(SCliThrd* pThrd) { void** pIter = taosHashIterate(pThrd->batchCache, NULL); while (pIter != NULL) { - SCliBatch* batch = (SCliBatch*)(*pIter); - cliDestroyBatch(batch); + SCliBatchList* pBatchList = (SCliBatchList*)(*pIter); + while (!QUEUE_IS_EMPTY(&pBatchList->wq)) { + queue* h = QUEUE_HEAD(&pBatchList->wq); + QUEUE_REMOVE(h); + + SCliBatch* pBatch = QUEUE_DATA(h, SCliBatch, listq); + cliDestroyBatch(pBatch); + } + taosMemoryFree(pBatchList); pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); } taosHashCleanup(pThrd->batchCache); -- GitLab