diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index acfd5dfb51553a37c856d8cd0792e6dd0e1f91ec..0cc0ab64eff68ff00e25213eb8ed57ba04916988 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -115,8 +115,9 @@ typedef struct SRpcInit { int32_t connLimitNum; int32_t connLimitLock; - int8_t supportBatch; // 0: no batch, 1. batch - void *parent; + int8_t supportBatch; // 0: no batch, 1. batch + int32_t batchSize; + void *parent; } SRpcInit; typedef struct { diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index f35352268f864f3c1a2c138b954e5e6513e9d9cc..4e9b7149e42956b88b8881c60901dd761b271eb9 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -286,11 +286,12 @@ int32_t dmInitClient(SDnode *pDnode) { int32_t connLimitNum = 10000 / (tsNumOfRpcThreads * 3); connLimitNum = TMAX(connLimitNum, 100); - connLimitNum = TMIN(connLimitNum, 600); + connLimitNum = TMIN(connLimitNum, 500); rpcInit.connLimitNum = connLimitNum; rpcInit.connLimitLock = 1; rpcInit.supportBatch = 1; + rpcInit.batchSize = 64 * 1024; pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 13adb4d2b4a92a263a36d40b28ce0d007dabf416..1f3c98ad7283c3f1594a67499d6fad8c0f886f47 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -67,6 +67,7 @@ typedef struct { int32_t connLimitNum; int8_t connLimitLock; // 0: no lock. 1. lock int8_t supportBatch; // 0: no batch, 1: support batch + int32_t batchSize; int index; void* parent; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 38ec1c7fdccf6cb360ae141b52465ccb8c03fb88..16ea25a41abb70bb1a60c8a84cd3536d054779eb 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -70,6 +70,7 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->connLimitNum = pInit->connLimitNum; pRpc->connLimitLock = pInit->connLimitLock; pRpc->supportBatch = pInit->supportBatch; + pRpc->batchSize = pInit->batchSize; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; if (pRpc->numOfThreads <= 0) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f106e07e37f37fff4da50d7fbaf346ef6664e296..f0635d376cac106900f8c747c320baeef0ebfc36 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -27,6 +27,7 @@ typedef struct { int connMax; int connCnt; int batchLenLimit; + int sending; char* dst; char* ip; @@ -992,6 +993,8 @@ static void cliDestroyBatch(SCliBatch* pBatch) { SCliMsg* p = QUEUE_DATA(h, SCliMsg, q); destroyCmsg(p); } + SCliBatchList* p = pBatch->pList; + p->sending -= 1; taosMemoryFree(pBatch); } static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { @@ -1461,11 +1464,12 @@ static void cliNoBatchDealReq(queue* wq, SCliThrd* pThrd) { } } SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { - if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt >= pList->connMax) { + if (QUEUE_IS_EMPTY(&pList->wq) || pList->connCnt > pList->connMax || pList->sending > pList->connMax) { return NULL; } queue* hr = QUEUE_HEAD(&pList->wq); QUEUE_REMOVE(hr); + pList->sending += 1; pList->len -= 1; @@ -1474,6 +1478,8 @@ SCliBatch* cliGetHeadFromList(SCliBatchList* pList) { } static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { + STrans* pInst = pThrd->pTransInst; + int count = 0; while (!QUEUE_IS_EMPTY(wq)) { queue* h = QUEUE_HEAD(wq); @@ -1493,9 +1499,10 @@ static void cliBatchDealReq(queue* wq, SCliThrd* pThrd) { if (ppBatchList == NULL || *ppBatchList == NULL) { SCliBatchList* pBatchList = taosMemoryCalloc(1, sizeof(SCliBatchList)); QUEUE_INIT(&pBatchList->wq); - pBatchList->connMax = 200; + pBatchList->connMax = pInst->connLimitNum; pBatchList->connCnt = 0; - pBatchList->batchLenLimit = 16 * 1024; + pBatchList->batchLenLimit = pInst->batchSize; + pBatchList->ip = strdup(ip); pBatchList->dst = strdup(key); pBatchList->port = port; diff --git a/source/libs/transport/test/cliBench.c b/source/libs/transport/test/cliBench.c index 5901a71929deadb40d96ccf4f2e5a69544f3a7fa..aaee162cd754eb1e209fc21d8568fd703c57d303 100644 --- a/source/libs/transport/test/cliBench.c +++ b/source/libs/transport/test/cliBench.c @@ -114,8 +114,9 @@ int main(int argc, char *argv[]) { rpcInit.user = "michael"; rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.connLimitNum = 300; + rpcInit.connLimitNum = 10; rpcInit.connLimitLock = 1; + rpcInit.batchSize = 16 * 1024; rpcInit.supportBatch = 1; rpcDebugFlag = 135;