diff --git a/include/common/tglobal.h b/include/common/tglobal.h index b5fd6c0270a0cb84d15d298379afb81968b4cf56..b35b46021168ed38058f0bdd4a20742125d02204 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -50,6 +50,7 @@ extern int32_t tsTagFilterResCacheSize; // queue & threads extern int32_t tsNumOfRpcThreads; extern int32_t tsNumOfRpcSessions; +extern int32_t tsTimeToGetAvailableConn; extern int32_t tsNumOfCommitThreads; extern int32_t tsNumOfTaskQueueThreads; extern int32_t tsNumOfMnodeQueryThreads; diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 0cc0ab64eff68ff00e25213eb8ed57ba04916988..c73e5c127af03cdc07e47c1ca95dd775a38599f2 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -114,7 +114,7 @@ typedef struct SRpcInit { int32_t connLimitNum; int32_t connLimitLock; - + int32_t timeToGetConn; int8_t supportBatch; // 0: no batch, 1. batch int32_t batchSize; void *parent; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 23486480414ed605049805ef44cbc6b03ddcc55e..79c5baf43d8971d2e4384ddec6ce3c92a14c25bc 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -144,7 +144,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; rpcInit.label = "TSC"; - rpcInit.numOfThreads = numOfThread; + rpcInit.numOfThreads = tsNumOfRpcThreads; rpcInit.cfp = processMsgFromServer; rpcInit.rfp = clientRpcRfp; rpcInit.sessions = 1024; @@ -159,6 +159,12 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.retryMaxInterval = tsRedirectMaxPeriod; rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; + int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 5); + connLimitNum = TMAX(connLimitNum, 10); + connLimitNum = TMIN(connLimitNum, 500); + rpcInit.connLimitNum = connLimitNum; + rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + void *pDnodeConn = rpcOpen(&rpcInit); if (pDnodeConn == NULL) { tscError("failed to init connection to server"); @@ -517,7 +523,7 @@ void taos_init_imp(void) { if (code) { printf("failed to init memory dbg, error:%s\n", tstrerror(code)); } else { - tsAsyncLog = false; + tsAsyncLog = false; printf("memory dbg enabled\n"); } } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a6d2fad816f8149cd27756c9356fed770239e1e1..7bdbc3dc56bcab2734670cedac28ca9c411cc78b 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2034,6 +2034,12 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de rpcInit.compressSize = tsCompressMsgSize; rpcInit.user = "_dnd"; + int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3); + connLimitNum = TMAX(connLimitNum, 10); + connLimitNum = TMIN(connLimitNum, 500); + rpcInit.connLimitNum = connLimitNum; + rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + clientRpc = rpcOpen(&rpcInit); if (clientRpc == NULL) { tscError("failed to init server status client"); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 02d83af3a7e66396a0b1585ad0a823e618f558d4..99795fcc794ecf875497abf3bb180cfd92e4919d 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -41,7 +41,8 @@ bool tsPrintAuth = false; // queue & threads int32_t tsNumOfRpcThreads = 1; -int32_t tsNumOfRpcSessions = 2000; +int32_t tsNumOfRpcSessions = 5000; +int32_t tsTimeToGetAvailableConn = 100000; int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfTaskQueueThreads = 4; int32_t tsNumOfMnodeQueryThreads = 4; @@ -326,6 +327,12 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1; + tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 100000); + if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1; + + tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 10000000); + if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, 0) != 0) return -1; + tsNumOfTaskQueueThreads = tsNumOfCores / 2; tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); if (tsNumOfTaskQueueThreads >= 10) { @@ -397,6 +404,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000); if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1; + tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000); + if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsNumOfRpcSessions, 20, 1000000, 0) != 0) return -1; + tsNumOfCommitThreads = tsNumOfCores / 2; tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4); if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1; @@ -517,6 +527,14 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } + pItem = cfgGetItem(tsCfg, "timeToGetAvailableConn"); + if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { + tsTimeToGetAvailableConn = 1000; + tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000); + pItem->i32 = tsTimeToGetAvailableConn; + pItem->stype = stype; + } + pItem = cfgGetItem(tsCfg, "numOfCommitThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { tsNumOfCommitThreads = numOfCores / 2; @@ -698,6 +716,10 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; + + tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32; + + tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32; return 0; } @@ -735,6 +757,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32; tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32; + tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32; + tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; @@ -742,7 +766,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; - // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; + // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchTereads")->i32; tsNumOfSnodeStreamThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 23a047d49aff5c76c5b04140c7301898e6052478..ea46b7069385fba0c204c5e78f9f7e7983053d85 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -93,15 +93,15 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { break; } -/* -pDnode is null, TD-22618 -at trans.c line 91 -before this line, dmProcessRpcMsg callback is set -after this line, parent is set -so when dmProcessRpcMsg is called, pDonde is still null. -*/ - if (pDnode != NULL){ - if(pDnode->status != DND_STAT_RUNNING) { + /* + pDnode is null, TD-22618 + at trans.c line 91 + before this line, dmProcessRpcMsg callback is set + after this line, parent is set + so when dmProcessRpcMsg is called, pDonde is still null. + */ + if (pDnode != NULL) { + if (pDnode->status != DND_STAT_RUNNING) { if (pRpc->msgType == TDMT_DND_SERVER_STATUS) { dmProcessServerStartupStatus(pDnode, pRpc); return; @@ -113,7 +113,7 @@ so when dmProcessRpcMsg is called, pDonde is still null. } goto _OVER; } - } + } } else { terrno = TSDB_CODE_APP_IS_STARTING; goto _OVER; @@ -304,6 +304,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.connLimitLock = 1; rpcInit.supportBatch = 1; rpcInit.batchSize = 8 * 1024; + rpcInit.timeToGetConn = tsTimeToGetAvailableConn; pTrans->clientRpc = rpcOpen(&rpcInit); if (pTrans->clientRpc == NULL) { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index fe6ed7d785c7b6a6d99adbe9f930294bf8b950ef..dc6fc3ad7420ae7ebe08feecf336aa57121324c3 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -606,9 +606,8 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) { } static bool udfdRpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER || - code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || - code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || - code == TSDB_CODE_APP_IS_STOPPING) { + code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_RESTORING || + code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) { return false; @@ -673,6 +672,12 @@ int32_t udfdOpenClientRpc() { rpcInit.rfp = udfdRpcRfp; rpcInit.compressSize = tsCompressMsgSize; + int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3); + connLimitNum = TMAX(connLimitNum, 10); + connLimitNum = TMIN(connLimitNum, 500); + rpcInit.connLimitNum = connLimitNum; + rpcInit.timeToGetConn = tsTimeToGetAvailableConn; + global.clientRpc = rpcOpen(&rpcInit); if (global.clientRpc == NULL) { fnError("failed to init dnode rpc client"); @@ -765,7 +770,7 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) { } void udfdHandleRequest(SUdfdUvConn *conn) { - char *inputBuf = conn->inputBuf; + char *inputBuf = conn->inputBuf; int32_t inputLen = conn->inputLen; uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t)); @@ -784,7 +789,7 @@ void udfdHandleRequest(SUdfdUvConn *conn) { void udfdPipeCloseCb(uv_handle_t *pipe) { SUdfdUvConn *conn = pipe->data; - SUvUdfWork* pWork = conn->pWorkList; + SUvUdfWork *pWork = conn->pWorkList; while (pWork != NULL) { pWork->conn = NULL; pWork = pWork->pWorkNext; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 5ff67c87ca92e85869b885f65c69ab9bc43ea1c2..a2c486767f931ac7644217dba24d9cab55b48c7e 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -148,7 +148,8 @@ typedef struct { int8_t epsetRetryCnt; int32_t retryCode; - int hThrdIdx; + void* task; + int hThrdIdx; } STransConnCtx; #pragma pack(push, 1) diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 1f3c98ad7283c3f1594a67499d6fad8c0f886f47..8ea0064d4485c031a49b39bad70fd41e0aec7a85 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -64,11 +64,11 @@ typedef struct { void (*destroyFp)(void* ahandle); bool (*failFastFp)(tmsg_t msgType); - int32_t connLimitNum; - int8_t connLimitLock; // 0: no lock. 1. lock - int8_t supportBatch; // 0: no batch, 1: support batch - int32_t batchSize; - + int32_t connLimitNum; + int8_t connLimitLock; // 0: no lock. 1. lock + int8_t supportBatch; // 0: no batch, 1: support batch + int32_t batchSize; + int32_t timeToGetConn; int index; void* parent; void* tcphandle; // returned handle from TCP initialization diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index f5f3b52f50788e84131d8a8101ce7864e7fbdbaa..0771f9198aa5e20fe37b95070781b81d01200351 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -66,6 +66,10 @@ void* rpcOpen(const SRpcInit* pInit) { pRpc->destroyFp = pInit->dfp; pRpc->failFastFp = pInit->ffp; pRpc->connLimitNum = pInit->connLimitNum; + if (pRpc->connLimitNum == 0) { + pRpc->connLimitNum = 20; + } + pRpc->connLimitLock = pInit->connLimitLock; pRpc->supportBatch = pInit->supportBatch; pRpc->batchSize = pInit->batchSize; @@ -90,7 +94,10 @@ void* rpcOpen(const SRpcInit* pInit) { if (pInit->user) { tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user)); } - + pRpc->timeToGetConn = pInit->timeToGetConn; + if (pRpc->timeToGetConn == 0) { + pRpc->timeToGetConn = 10 * 1000; + } pRpc->tcphandle = (*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e008320222e2c9f2ace809a1a6de919a90b33677..be34cb6c8b1cb103643c1440937251fd976fd434 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -13,9 +13,15 @@ */ #include "transComm.h" +typedef struct { + int32_t numOfConn; + queue msgQ; +} SMsgList; + typedef struct SConnList { - queue conns; - int32_t size; + queue conns; + int32_t size; + SMsgList* list; } SConnList; typedef struct { @@ -100,6 +106,7 @@ typedef struct SCliThrd { TdThreadMutex msgMtx; SDelayQueue* delayQueue; SDelayQueue* timeoutQueue; + SDelayQueue* waitConnQueue; uint64_t nextTimeout; // next timeout void* pTransInst; // @@ -109,7 +116,6 @@ typedef struct SCliThrd { SCvtAddr cvtAddr; SHashObj* failFastCache; - SHashObj* connLimitCache; SHashObj* batchCache; SCliMsg* stopMsg; @@ -134,8 +140,8 @@ typedef struct { // conn pool // add expire timeout and capacity limit static void* createConnPool(int size); -static void* destroyConnPool(void* pool); -static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); +static void* destroyConnPool(SCliThrd* thread); +static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed); static void addConnToPool(void* pool, SCliConn* conn); static void doCloseIdleConn(void* param); @@ -175,7 +181,8 @@ static void cliSend(SCliConn* pConn); static void cliSendBatch(SCliConn* pConn); static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); -static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port); +static void doFreeTimeoutMsg(void* param); +static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg** pMsg); // cli util func static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); @@ -193,6 +200,7 @@ static void cliHandleExcept(SCliConn* conn); static void cliReleaseUnfinishedMsg(SCliConn* conn); static void cliHandleFastFail(SCliConn* pConn, int status); +static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd); // handle req from app static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd); static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd); @@ -333,12 +341,8 @@ bool cliMaySendCachedMsg(SCliConn* conn) { if (!transQueueEmpty(&conn->cliMsgs)) { SCliMsg* pCliMsg = NULL; CONN_GET_NEXT_SENDMSG(conn); - if (pCliMsg == NULL) - return false; - else { - cliSend(conn); - return true; - } + cliSend(conn); + return true; } return false; _RETURN: @@ -545,7 +549,8 @@ void* createConnPool(int size) { // thread local, no lock return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); } -void* destroyConnPool(void* pool) { +void* destroyConnPool(SCliThrd* pThrd) { + void* pool = pThrd->pool; SConnList* connList = taosHashIterate((SHashObj*)pool, NULL); while (connList != NULL) { while (!QUEUE_IS_EMPTY(&connList->conns)) { @@ -553,34 +558,130 @@ void* destroyConnPool(void* pool) { SCliConn* c = QUEUE_DATA(h, SCliConn, q); cliDestroyConn(c, true); } + + SMsgList* msglist = connList->list; + while (!QUEUE_IS_EMPTY(&msglist->msgQ)) { + queue* h = QUEUE_HEAD(&msglist->msgQ); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + + transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task); + pMsg->ctx->task = NULL; + + doNotifyApp(pMsg, pThrd); + } + taosMemoryFree(msglist); + connList = taosHashIterate((SHashObj*)pool, connList); } taosHashCleanup(pool); return NULL; } -static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, ip, port); +static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { + void* pool = pThrd->pool; + SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + STrans* pTranInst = pThrd->pTransInst; + if (plist == NULL) { + SConnList list = {0}; + taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); + plist = taosHashGet(pool, key, strlen(key)); + + SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); + QUEUE_INIT(&nList->msgQ); + nList->numOfConn++; + + QUEUE_INIT(&plist->conns); + plist->list = nList; + } + + if (QUEUE_IS_EMPTY(&plist->conns)) { + if (plist->list->numOfConn >= pTranInst->connLimitNum) { + *exceed = true; + } + return NULL; + } + + queue* h = QUEUE_TAIL(&plist->conns); + QUEUE_REMOVE(h); + plist->size -= 1; + SCliConn* conn = QUEUE_DATA(h, SCliConn, q); + conn->status = ConnNormal; + QUEUE_INIT(&conn->q); + + if (conn->task != NULL) { + transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); + conn->task = NULL; + } + return conn; +} + +static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { + void* pool = pThrd->pool; + STrans* pTransInst = pThrd->pTransInst; SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); if (plist == NULL) { SConnList list = {0}; taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); - plist = taosHashGet((SHashObj*)pool, key, strlen(key)); - if (plist == NULL) return NULL; + plist = taosHashGet(pool, key, strlen(key)); + + SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); + QUEUE_INIT(&nList->msgQ); + nList->numOfConn++; + QUEUE_INIT(&plist->conns); + plist->list = nList; } + STraceId* trace = &(*pMsg)->msg.info.traceId; + // no avaliable conn in pool if (QUEUE_IS_EMPTY(&plist->conns)) { + SMsgList* list = plist->list; + if ((list)->numOfConn >= pTransInst->connLimitNum) { + STraceId* trace = &(*pMsg)->msg.info.traceId; + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + arg->param1 = *pMsg; + arg->param2 = pThrd; + (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + + tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType)); + + QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); + *pMsg = NULL; + } else { + // send msg in delay queue + if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) { + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + arg->param1 = *pMsg; + arg->param2 = pThrd; + (*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn); + tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, + TMSG_INFO((*pMsg)->msg.msgType)); + + QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q); + queue* h = QUEUE_HEAD(&(list)->msgQ); + QUEUE_REMOVE(h); + SCliMsg* ans = QUEUE_DATA(h, SCliMsg, q); + + *pMsg = ans; + + trace = &(*pMsg)->msg.info.traceId; + tGTrace("%s msg %s pop from delay queue, start to send", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType)); + transDQCancel(pThrd->waitConnQueue, ans->ctx->task); + } + list->numOfConn++; + } return NULL; } + queue* h = QUEUE_TAIL(&plist->conns); plist->size -= 1; - queue* h = QUEUE_HEAD(&plist->conns); + QUEUE_REMOVE(h); + SCliConn* conn = QUEUE_DATA(h, SCliConn, q); conn->status = ConnNormal; - QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); if (conn->task != NULL) { @@ -608,18 +709,34 @@ static void addConnToPool(void* pool, SCliConn* conn) { cliDestroyConnMsgs(conn, false); - conn->status = ConnInPool; - if (conn->list == NULL) { - tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip)); - } else { - tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); } + + SConnList* pList = conn->list; + SMsgList* msgList = pList->list; + if (!QUEUE_IS_EMPTY(&msgList->msgQ)) { + queue* h = QUEUE_HEAD(&(msgList)->msgQ); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + + transDQCancel(thrd->waitConnQueue, pMsg->ctx->task); + pMsg->ctx->task = NULL; + + transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); + transQueuePush(&conn->cliMsgs, pMsg); + + conn->status = ConnNormal; + cliSend(conn); + return; + } + + conn->status = ConnInPool; QUEUE_PUSH(&conn->list->conns, &conn->q); conn->list->size += 1; - if (conn->list->size >= 250) { + if (conn->list->size >= 20) { STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); arg->param1 = conn; arg->param2 = thrd; @@ -741,8 +858,20 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { static void cliDestroyConn(SCliConn* conn, bool clear) { SCliThrd* pThrd = conn->hostThrd; tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); + QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); + + if (conn->list != NULL) { + SConnList* connList = conn->list; + connList->list->numOfConn--; + connList->size--; + } else { + SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip)); + connList->list->numOfConn--; + } + conn->list = NULL; + transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); conn->refId = -1; @@ -777,9 +906,6 @@ static void cliDestroy(uv_handle_t* handle) { conn->timer->data = NULL; conn->timer = NULL; } - int32_t* oVal = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip)); - int32_t nVal = oVal == NULL ? 0 : (*oVal) - 1; - taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nVal, sizeof(nVal)); atomic_sub_fetch_32(&pThrd->connCount, 1); @@ -1012,11 +1138,15 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { STrans* pTransInst = pThrd->pTransInst; SCliBatchList* pList = pBatch->pList; - SCliConn* conn = getConnFromPool(pThrd->pool, pList->ip, pList->port); + char key[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port); + + bool exceed = false; + SCliConn* conn = getConnFromPool(pThrd, key, &exceed); - if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, pList->ip, pList->port)) { - tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pBatch->wLen, - pBatch->batchSize); + if (conn == NULL && exceed) { + tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pTransInst->label, pBatch->wLen, + pBatch->batchSize, pTransInst->connLimitNum); cliDestroyBatch(pBatch); return; } @@ -1176,10 +1306,6 @@ void cliConnCb(uv_connect_t* req, int status) { return; } - int32_t* oVal = taosHashGet(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip)); - int32_t nVal = oVal == NULL ? 0 : (*oVal) + 1; - taosHashPut(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip), &nVal, sizeof(nVal)); - struct sockaddr peername, sockname; int addrlen = sizeof(peername); uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen); @@ -1197,6 +1323,29 @@ void cliConnCb(uv_connect_t* req, int status) { } } +static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) { + STransConnCtx* pCtx = pMsg->ctx; + STrans* pTransInst = pThrd->pTransInst; + + STransMsg transMsg = {0}; + transMsg.contLen = 0; + transMsg.pCont = NULL; + transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS; + transMsg.msgType = pMsg->msg.msgType + 1; + transMsg.info.ahandle = pMsg->ctx->ahandle; + transMsg.info.traceId = pMsg->msg.info.traceId; + transMsg.info.hasEpSet = false; + if (pCtx->pSem != NULL) { + if (pCtx->pRsp == NULL) { + } else { + memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg)); + } + } else { + pTransInst->cfp(pTransInst->parent, &transMsg, NULL); + } + + destroyCmsg(pMsg); +} static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) { pThrd->stopMsg = pMsg; @@ -1206,7 +1355,8 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { pThrd->quit = true; tDebug("cli work thread %p start to quit", pThrd); destroyCmsg(pMsg); - destroyConnPool(pThrd->pool); + + destroyConnPool(pThrd); uv_walk(pThrd->loop, cliWalkCb, NULL); } static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { @@ -1239,11 +1389,11 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) { destroyCmsg(pMsg); } -SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { - STransConnCtx* pCtx = pMsg->ctx; +SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) { + STransConnCtx* pCtx = (*pMsg)->ctx; SCliConn* conn = NULL; - int64_t refId = (int64_t)(pMsg->msg.info.handle); + int64_t refId = (int64_t)((*pMsg)->msg.info.handle); if (refId != 0) { SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId); if (exh == NULL) { @@ -1253,7 +1403,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { } else { conn = exh->handle; if (conn == NULL) { - conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet)); + conn = getConnFromPool2(pThrd, addr, pMsg); if (conn != NULL) specifyConnRef(conn, true, refId); } transReleaseExHandle(transGetRefMgt(), refId); @@ -1261,7 +1411,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { return conn; }; - conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet)); + conn = getConnFromPool2(pThrd, addr, pMsg); if (conn != NULL) { tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool); } else { @@ -1319,57 +1469,34 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { return; } -static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port) { - STrans* pTransInst = pThrd->pTransInst; - - // STransConnCtx* pCtx = pMsg->ctx; - // char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); - // int32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); - - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, ip, port); - - int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key)); - if (val == NULL) return 0; +static void doFreeTimeoutMsg(void* param) { + STaskArg* arg = param; + SCliMsg* pMsg = arg->param1; + SCliThrd* pThrd = arg->param2; + STrans* pTransInst = pThrd->pTransInst; - if (*val >= pTransInst->connLimitNum) { - return -1; - } - return 0; + QUEUE_REMOVE(&pMsg->q); + STraceId* trace = &pMsg->msg.info.traceId; + tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); + doNotifyApp(pMsg, pThrd); + taosMemoryFree(arg); } void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { - STrans* pTransInst = pThrd->pTransInst; - STransConnCtx* pCtx = pMsg->ctx; - - cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); - STraceId* trace = &pMsg->msg.info.traceId; - char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); - uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + STrans* pTransInst = pThrd->pTransInst; - if (!EPSET_IS_VALID(&pCtx->epSet)) { - tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); + cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr); + if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) { destroyCmsg(pMsg); return; } - if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, ip, port); - - SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); - if (item != NULL) { - int32_t elapse = (int32_t)(taosGetTimestampMs() - item->timestamp); - if (item->count >= pTransInst->failFastThreshold && (elapse >= 0 && elapse <= pTransInst->failFastInterval)) { - tGTrace("%s, msg %s cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label, - TMSG_INFO(pMsg->msg.msgType), ip, port, item->count, elapse); - destroyCmsg(pMsg); - return; - } - } - } + char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet); + uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet); + char addr[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port); bool ignore = false; - SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); + SCliConn* conn = cliGetConn(&pMsg, pThrd, &ignore, addr); if (ignore == true) { // persist conn already release by server STransMsg resp; @@ -1380,16 +1507,13 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { destroyCmsg(pMsg); return; } - - if (conn == NULL && REQUEST_NO_RESP(&pMsg->msg) && 0 != cliPreCheckSessionLimit(pThrd, ip, port)) { - tGTrace("%s, msg %s cancel to send, reason: %s", pTransInst->label, TMSG_INFO(pMsg->msg.msgType), - tstrerror(TSDB_CODE_RPC_MAX_SESSIONS)); - destroyCmsg(pMsg); + if (conn == NULL && pMsg == NULL) { return; } + STraceId* trace = &pMsg->msg.info.traceId; if (conn != NULL) { - transCtxMerge(&conn->ctx, &pCtx->appCtx); + transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); cliSend(conn); } else { @@ -1398,15 +1522,10 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { int64_t refId = (int64_t)pMsg->msg.info.handle; if (refId != 0) specifyConnRef(conn, true, refId); - transCtxMerge(&conn->ctx, &pCtx->appCtx); + transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); - char key[TSDB_FQDN_LEN + 64] = {0}; - char* fqdn = EPSET_GET_INUSE_IP(&pCtx->epSet); - uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); - CONN_CONSTRUCT_HASH_KEY(key, fqdn, port); - - conn->ip = taosStrdup(key); + conn->ip = taosStrdup(addr); uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn); if (ipaddr == 0xffffffff) { @@ -1830,14 +1949,14 @@ static SCliThrd* createThrdObj(void* trans) { transDQCreate(pThrd->loop, &pThrd->timeoutQueue); + transDQCreate(pThrd->loop, &pThrd->waitConnQueue); + pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); pThrd->pTransInst = trans; pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, - pTransInst->connLimitLock == 0 ? HASH_NO_LOCK : HASH_ENTRY_LOCK); pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); @@ -1857,6 +1976,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle); transDQDestroy(pThrd->timeoutQueue, NULL); + transDQDestroy(pThrd->waitConnQueue, NULL); tDebug("thread destroy %" PRId64, pThrd->pid); for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) { @@ -1868,7 +1988,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(pThrd->loop); taosHashCleanup(pThrd->fqdn2ipCache); taosHashCleanup(pThrd->failFastCache); - taosHashCleanup(pThrd->connLimitCache); void** pIter = taosHashIterate(pThrd->batchCache, NULL); while (pIter != NULL) { diff --git a/tools/shell/src/shellNettest.c b/tools/shell/src/shellNettest.c index 52ce37b22c7666e5c068937e5b073b6ffb0c530c..1a6ac3489dad818feac855625d5cc502875973e0 100644 --- a/tools/shell/src/shellNettest.c +++ b/tools/shell/src/shellNettest.c @@ -21,7 +21,7 @@ static void shellWorkAsClient() { SRpcInit rpcInit = {0}; SEpSet epSet = {.inUse = 0, .numOfEps = 1}; SRpcMsg rpcRsp = {0}; - void * clientRpc = NULL; + void *clientRpc = NULL; char pass[TSDB_PASSWORD_LEN + 1] = {0}; taosEncryptPass_c((uint8_t *)("_pwd"), strlen("_pwd"), pass); @@ -31,6 +31,7 @@ static void shellWorkAsClient() { rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.user = "_dnd"; + rpcInit.timeToGetConn = tsTimeToGetAvailableConn; clientRpc = rpcOpen(&rpcInit); if (clientRpc == NULL) {