diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 06ef7b7c9c702c3b3517efbfbf026a970305ea36..6122a1d4657c90d5c826b7ea6907bdc4a4a1e70a 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -23,12 +23,12 @@ #include "scheduler.h" #include "tcache.h" #include "tglobal.h" +#include "thttp.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" #include "tsched.h" #include "ttime.h" -#include "thttp.h" #define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_RELEASED 0 @@ -80,17 +80,18 @@ static void deregisterRequest(SRequestObj *pRequest) { pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst); if (pRequest->pQuery && pRequest->pQuery->pRoot) { - if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->pQuery->pRoot->type && (0 == ((SVnodeModifOpStmt*)pRequest->pQuery->pRoot)->sqlNodeType)) { + if (QUERY_NODE_VNODE_MODIF_STMT == pRequest->pQuery->pRoot->type && + (0 == ((SVnodeModifOpStmt *)pRequest->pQuery->pRoot)->sqlNodeType)) { tscDebug("insert duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64 - "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", - duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, - pRequest->metric.planCostUs, pRequest->metric.execCostUs); + "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", + duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, + pRequest->metric.planCostUs, pRequest->metric.execCostUs); atomic_add_fetch_64((int64_t *)&pActivity->insertElapsedTime, duration); } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { tscDebug("query duration %" PRId64 "us: parseCost:%" PRId64 "us, ctgCost:%" PRId64 "us, analyseCost:%" PRId64 - "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", - duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, - pRequest->metric.planCostUs, pRequest->metric.execCostUs); + "us, planCost:%" PRId64 "us, exec:%" PRId64 "us", + duration, pRequest->metric.parseCostUs, pRequest->metric.ctgCostUs, pRequest->metric.analyseCostUs, + pRequest->metric.planCostUs, pRequest->metric.execCostUs); atomic_add_fetch_64((int64_t *)&pActivity->queryElapsedTime, duration); } @@ -154,6 +155,11 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { rpcInit.retryMaxInterval = tsRedirectMaxPeriod; rpcInit.retryMaxTimouet = tsMaxRetryWaitTime; + int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3); + connLimitNum = TMAX(connLimitNum, 10); + connLimitNum = TMIN(connLimitNum, 500); + rpcInit.connLimitNum = connLimitNum; + void *pDnodeConn = rpcOpen(&rpcInit); if (pDnodeConn == NULL) { tscError("failed to init connection to server"); @@ -369,9 +375,9 @@ void doDestroyRequest(void *p) { } if (pRequest->syncQuery) { - if (pRequest->body.param){ - tsem_destroy(&((SSyncQueryParam*)pRequest->body.param)->sem); - } + if (pRequest->body.param) { + tsem_destroy(&((SSyncQueryParam *)pRequest->body.param)->sem); + } taosMemoryFree(pRequest->body.param); } @@ -398,20 +404,20 @@ static void *tscCrashReportThreadFp(void *param) { setThreadName("client-crashReport"); char filepath[PATH_MAX] = {0}; snprintf(filepath, sizeof(filepath), "%s%s.taosCrashLog", tsLogDir, TD_DIRSEP); - char *pMsg = NULL; - int64_t msgLen = 0; + char *pMsg = NULL; + int64_t msgLen = 0; TdFilePtr pFile = NULL; - bool truncateFile = false; - int32_t sleepTime = 200; - int32_t reportPeriodNum = 3600 * 1000 / sleepTime; - int32_t loopTimes = reportPeriodNum; + bool truncateFile = false; + int32_t sleepTime = 200; + int32_t reportPeriodNum = 3600 * 1000 / sleepTime; + int32_t loopTimes = reportPeriodNum; #ifdef WINDOWS if (taosCheckCurrentInDll()) { atexit(crashReportThreadFuncUnexpectedStopped); } #endif - + while (1) { if (clientStop) break; if (loopTimes++ < reportPeriodNum) { @@ -441,12 +447,12 @@ static void *tscCrashReportThreadFp(void *param) { pMsg = NULL; continue; } - + if (pFile) { taosReleaseCrashLogFile(pFile, truncateFile); truncateFile = false; } - + taosMsleep(sleepTime); loopTimes = 0; } @@ -459,11 +465,11 @@ int32_t tscCrashReportInit() { if (!tsEnableCrashReport) { return 0; } - + TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); - TdThread crashReportThread; + TdThread crashReportThread; if (taosThreadCreate(&crashReportThread, &thAttr, tscCrashReportThreadFp, NULL) != 0) { tscError("failed to create crashReport thread since %s", strerror(errno)); return -1; @@ -488,26 +494,24 @@ void tscStopCrashReport() { } } - void tscWriteCrashInfo(int signum, void *sigInfo, void *context) { - char *pMsg = NULL; + char *pMsg = NULL; const char *flags = "UTL FATAL "; ELogLevel level = DEBUG_FATAL; int32_t dflag = 255; - int64_t msgLen= -1; + int64_t msgLen = -1; if (tsEnableCrashReport) { if (taosGenCrashJsonMsg(signum, &pMsg, lastClusterId, appInfo.startTime)) { taosPrintLog(flags, level, dflag, "failed to generate crash json msg"); } else { - msgLen = strlen(pMsg); + msgLen = strlen(pMsg); } } taosLogCrashInfo("taos", pMsg, msgLen, signum, sigInfo); } - void taos_init_imp(void) { // In the APIs of other program language, taos_cleanup is not available yet. // So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning. @@ -561,7 +565,7 @@ void taos_init_imp(void) { taosThreadMutexInit(&appInfo.mutex, NULL); tscCrashReportInit(); - + tscDebug("client is initialized successfully"); } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 079edcd6671bfc1afabd2aaabe1f6c03c48dbff7..c85e761c0b1c92669efbe71a704ddd44dad6826b 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2008,6 +2008,11 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de rpcInit.compressSize = tsCompressMsgSize; rpcInit.user = "_dnd"; + int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3); + connLimitNum = TMAX(connLimitNum, 10); + connLimitNum = TMIN(connLimitNum, 500); + rpcInit.connLimitNum = connLimitNum; + clientRpc = rpcOpen(&rpcInit); if (clientRpc == NULL) { tscError("failed to init server status client"); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index e3f08e912ab3bf955f5e1905439a317c4464e214..727663ba65583a7ac6f2d83206edf2753a4a576f 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -42,6 +42,7 @@ bool tsPrintAuth = false; // queue & threads int32_t tsNumOfRpcThreads = 1; int32_t tsNumOfRpcSessions = 2000; +int32_t tsTimeToGetAvailableConn = 1000; int32_t tsNumOfCommitThreads = 2; int32_t tsNumOfTaskQueueThreads = 4; int32_t tsNumOfMnodeQueryThreads = 4; @@ -326,6 +327,12 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1; + tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 100000); + if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1; + + tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 10000000); + if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, 0) != 0) return -1; + tsNumOfTaskQueueThreads = tsNumOfCores / 2; tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); if (tsNumOfTaskQueueThreads >= 10) { @@ -395,6 +402,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000); if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1; + tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000); + if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsNumOfRpcSessions, 20, 1000000, 0) != 0) return -1; + tsNumOfCommitThreads = tsNumOfCores / 2; tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4); if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1; @@ -515,6 +525,14 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } + pItem = cfgGetItem(tsCfg, "timeToGetAvailableConn"); + if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { + tsTimeToGetAvailableConn = 1000; + tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000); + pItem->i32 = tsTimeToGetAvailableConn; + pItem->stype = stype; + } + pItem = cfgGetItem(tsCfg, "numOfCommitThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { tsNumOfCommitThreads = numOfCores / 2; @@ -696,6 +714,10 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; + + tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32; + + tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32; return 0; } @@ -732,7 +754,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval; tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32; - tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32; + tsNumOfRpcSessions = cfgGetItem(pCfg, "numofrpcsessions")->i32; + tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32; + tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32; tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32; tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32; @@ -740,7 +764,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; - // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; + // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchTereads")->i32; tsNumOfSnodeStreamThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index a41cc0068c3e906ffba65f4bad0feb8847665a55..843ca4515e2a7d5fc892e3fe300e6b6347a23eb4 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -147,7 +147,8 @@ typedef struct { int8_t epsetRetryCnt; int32_t retryCode; - int hThrdIdx; + void* task; + int hThrdIdx; } STransConnCtx; #pragma pack(push, 1) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index de5f9c26e0946d343c0bc1e0752783e5f155de0b..128d9b81623fd64ef0bb0408940c6963de165b8e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -105,6 +105,7 @@ typedef struct SCliThrd { TdThreadMutex msgMtx; SDelayQueue* delayQueue; SDelayQueue* timeoutQueue; + SDelayQueue* waitConnQueue; uint64_t nextTimeout; // next timeout void* pTransInst; // @@ -614,8 +615,9 @@ static void addConnToPool(void* pool, SCliConn* conn) { if (!QUEUE_IS_EMPTY(&(*msglist)->msgQ)) { queue* h = QUEUE_HEAD(&(*msglist)->msgQ); QUEUE_REMOVE(h); - SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + + transDQCancel(thrd->waitConnQueue, pMsg->ctx->task); transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); cliSend(conn); @@ -1218,15 +1220,53 @@ void cliConnCb(uv_connect_t* req, int status) { } } +static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) { + STransConnCtx* pCtx = pMsg->ctx; + STrans* pTransInst = pThrd->pTransInst; + + STransMsg transMsg = {0}; + transMsg.contLen = 0; + transMsg.pCont = NULL; + transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS; + transMsg.msgType = pMsg->msg.msgType + 1; + transMsg.info.ahandle = pMsg->ctx->ahandle; + transMsg.info.traceId = pMsg->msg.info.traceId; + transMsg.info.hasEpSet = false; + if (pCtx->pSem != NULL) { + if (pCtx->pRsp == NULL) { + } else { + memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg)); + } + } else { + pTransInst->cfp(pTransInst->parent, &transMsg, NULL); + } + + destroyCmsg(pMsg); +} static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) { pThrd->stopMsg = pMsg; return; } + void** pIter = taosHashIterate(pThrd->connLimitCache, NULL); + while (pIter != NULL) { + SMsgList* list = (SMsgList*)(*pIter); + while (!QUEUE_IS_EMPTY(&list->msgQ)) { + queue* h = QUEUE_HEAD(&list->msgQ); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task); + + doNotifyApp(pMsg, pThrd); + } + pIter = (void**)taosHashIterate(pThrd->connLimitCache, pIter); + } pThrd->stopMsg = NULL; pThrd->quit = true; tDebug("cli work thread %p start to quit", pThrd); destroyCmsg(pMsg); + destroyConnPool(pThrd->pool); uv_walk(pThrd->loop, cliWalkCb, NULL); } @@ -1353,7 +1393,19 @@ static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* addr) { } return 0; } +static void doFreeTimeoutMsg(void* param) { + STaskArg* arg = param; + SCliMsg* pMsg = arg->param1; + SCliThrd* pThrd = arg->param2; + STrans* pTransInst = pThrd->pTransInst; + + QUEUE_REMOVE(&pMsg->q); + STraceId* trace = &pMsg->msg.info.traceId; + tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); + doNotifyApp(pMsg, pThrd); + taosMemoryFree(arg); +} static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg* pMsg) { STrans* pTransInst = pThrd->pTransInst; @@ -1363,6 +1415,15 @@ static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMs } if ((*list)->numOfConn >= pTransInst->connLimitNum) { + STraceId* trace = &pMsg->msg.info.traceId; + + STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); + + arg->param1 = pMsg; + arg->param2 = pThrd; + + pMsg->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, 200); + tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); QUEUE_PUSH(&(*list)->msgQ, &pMsg->q); return -1; } @@ -1846,6 +1907,8 @@ static SCliThrd* createThrdObj(void* trans) { transDQCreate(pThrd->loop, &pThrd->timeoutQueue); + transDQCreate(pThrd->loop, &pThrd->waitConnQueue); + pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); pThrd->pTransInst = trans; @@ -1872,6 +1935,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle); transDQDestroy(pThrd->timeoutQueue, NULL); + transDQDestroy(pThrd->waitConnQueue, NULL); tDebug("thread destroy %" PRId64, pThrd->pid); for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) { @@ -1910,6 +1974,10 @@ static void destroyThrdObj(SCliThrd* pThrd) { QUEUE_REMOVE(h); SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + + if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) { + pThrd->destroyAhandleFp(pMsg->ctx->ahandle); + } destroyCmsg(pMsg); } taosMemoryFree(list);