From f19fdaa142f70202854a9835ef1b02c9f1d50232 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 17 Feb 2023 22:40:27 +0800 Subject: [PATCH] opt trans --- include/libs/transport/trpc.h | 5 +++-- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 3 ++- source/libs/transport/inc/transportInt.h | 1 + source/libs/transport/src/trans.c | 1 + source/libs/transport/src/transCli.c | 13 ++++++++++--- source/libs/transport/test/cliBench.c | 3 ++- 6 files changed, 19 insertions(+), 7 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index acfd5dfb51..0cc0ab64ef 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -115,8 +115,9 @@ typedef struct SRpcInit { int32_t connLimitNum; int32_t connLimitLock; - int8_t supportBatch; // 0: no batch, 1. batch - void *parent; + int8_t supportBatch; // 0: no batch, 1. batch + int32_t batchSize; + void *parent; } SRpcInit; typedef struct { diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index f35352268f..4e9b7149e4 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -286,11 +286,12 @@ int32_t dmInitClient(SDnode *pDnode) { int32_t connLimitNum = 10000 / (tsNumOfRpcThreads * 3); connLimitNum = TMAX(connLimitNum, 100); - connLimitNum = TMIN(connLimitNum, 600); + connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitLock = 1; rpcInit.supportBatch = 1; + rpcInit.batchSize = 64 * 1024; pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 13adb4d2b4..1f3c98ad72 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -67,6 +67,7 @@ typedef struct { int32_t connLimitNum; int8_t connLimitLock; // 0: no lock. 1. lock int8_t supportBatch; // 0: no batch, 1: support batch + int32_t batchSize; int index; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 38ec1c7fdc..16ea25a41a 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -70,6 +70,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->connLimitNum = pInit->connLimitNum; pRpc->connLimitLock = pInit->connLimitLock; pRpc->supportBatch = pInit->supportBatch; + pRpc->batchSize = pInit->batchSize; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; if (pRpc->numOfThreads <= 0) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f106e07e37..f0635d376c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -27,6 +27,7 @@ typedef struct { int connMax; int connCnt; int batchLenLimit; + int sending; char* dst; char* ip; @@ -992,6 +993,8 @@ static void cliDestroyBatch(SCliBatch* pBatch) { SCliMsg* p = QUEUE_DATA(h, SCliMsg, q); destroyCmsg(p); } + SCliBatchList* p = pBatch->pList; + p->sending -= 1; taosMemoryFree(pBatch); } static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { @@ -1461,11 +1464,12 @@ static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) { } } SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { - if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt >= pList->connMax) { + if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt > pList->connMax || pList->sending > pList->connMax) { return NULL; } queue* hr = QUEUE_HEAD(&pList->wq); QUEUE_REMOVE(hr); + pList->sending += 1; pList->len -= 1; @@ -1474,6 +1478,8 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { } static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { + STrans* pInst = pThrd->pTransInst; + int count = 0; while (!QUEUE_IS_EMPTY(wq)) { queue* h = QUEUE_HEAD(wq); @@ -1493,9 +1499,10 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { if (ppBatchList == NULL || *ppBatchList == NULL) { SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); QUEUE_INIT(&pBatchList->wq); - pBatchList->connMax = 200; + pBatchList->connMax = pInst->connLimitNum; pBatchList->connCnt = 0; - pBatchList->batchLenLimit = 16 * 1024; + pBatchList->batchLenLimit = pInst->batchSize; + pBatchList->ip = strdup(ip); pBatchList->dst = strdup(key); pBatchList->port = port; diff --git a/source/libs/transport/test/cliBench.c b/source/libs/transport/test/cliBench.c index 5901a71929..aaee162cd7 100644 --- a/source/libs/transport/test/cliBench.c +++ b/source/libs/transport/test/cliBench.c @@ -114,8 +114,9 @@ int main(int argc, char *argv[]) { rpcInit.user = "michael"; rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.connLimitNum = 300; + rpcInit.connLimitNum = 10; rpcInit.connLimitLock = 1; + rpcInit.batchSize = 16 * 1024; rpcInit.supportBatch = 1; rpcDebugFlag = 135; -- GitLab