diff --git a/include/util/tdef.h b/include/util/tdef.h index 48dedd3e3e5bc80f703662cc25644b6ccf8cac76..ad44daed46ee977d14d0b4ba98fa80f8453db887 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -406,7 +406,7 @@ typedef enum ELogicConditionType { #ifdef WINDOWS #define TSDB_MAX_RPC_THREADS 4 // windows pipe only support 4 connections. #else -#define TSDB_MAX_RPC_THREADS 10 +#define TSDB_MAX_RPC_THREADS 20 #endif #define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 5b116a46dea456be23045bbeb8c8571cd143272f..124a504f7bf85aa154d48d57f4582483844e41c6 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -131,7 +131,7 @@ STscObj* taos_connect_internal(const char* ip, const char* user, const char* pas p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo)); p->mgmtEp = epSet; taosThreadMutexInit(&p->qnodeMutex, NULL); - p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); + p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores / 2); p->pAppHbMgr = appHbMgrInit(p, key); if (NULL == p->pAppHbMgr) { destroyAppInst(p); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 2e3389fb621d09ddc328203b6c7b1bfe01ea9ca0..aeef1b5277044c04d591ab0d5061c9bbf25a2cb7 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -308,6 +308,9 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { tsNumOfTaskQueueThreads = tsNumOfCores / 2; tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); + if (tsNumOfTaskQueueThreads >= 10) { + tsNumOfTaskQueueThreads = 10; + } if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 4, 1024, 0) != 0) return -1; return 0; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 5546d762f437a2afeb43f57e0cb30d13f6561319..f78fd33e4769fcf9328a03183f86242f4d2e4876 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -250,7 +250,7 @@ int32_t dmInitClient(SDnode *pDnode) { SRpcInit rpcInit = {0}; rpcInit.label = "DND-C"; - rpcInit.numOfThreads = 4; + rpcInit.numOfThreads = tsNumOfRpcThreads; rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 94bc128de9052457cc8d41c69ac33dc76fc2d32c..88888f2f8466f43650c4a8e28a470b92a6a0ba4d 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -58,6 +58,9 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->destroyFp = pInit->dfp; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; + if (pRpc->numOfThreads <= 0) { + pRpc->numOfThreads = 1; + } uint32_t ip = 0; if (pInit->connType == TAOS_CONN_SERVER) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 55bfb57a827530569b56e90f53658392ca05460f..71cc14493f712df610840539a0fc6c35b19bb11d 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -656,6 +656,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { conn->stream = (uv_stream_t*)taosMemoryMalloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); conn->stream->data = conn; + transSetConnOption((uv_tcp_t*)conn->stream); uv_timer_t* timer = taosArrayGetSize(pThrd->timerList) > 0 ? *(uv_timer_t**)taosArrayPop(pThrd->timerList) : NULL; if (timer == NULL) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 2759fb5aeb4e6b0eb77b25a6cc68a14844d11523..ad8d57c97a7a8bc9f078ab2171d5a4a76d4f46b3 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -202,9 +202,8 @@ bool transReadComplete(SConnBuffer* connBuf) { } int transSetConnOption(uv_tcp_t* stream) { - uv_tcp_nodelay(stream, 0); - int ret = uv_tcp_keepalive(stream, 5, 60); - return ret; + return uv_tcp_nodelay(stream, 1); + // int ret = uv_tcp_keepalive(stream, 5, 60); } SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index b7fe404a4ed8ffdd3cfd92f1fcb06b8caba05a9b..395e28d68f0e037fdabd15c89e6c9b223adf37c3 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -846,7 +846,7 @@ static bool addHandleToAcceptloop(void* arg) { return true; } void* transWorkerThread(void* arg) { - setThreadName("trans-worker"); + setThreadName("trans-svr-work"); SWorkThrd* pThrd = (SWorkThrd*)arg; uv_run(pThrd->loop, UV_RUN_DEFAULT);