未验证 提交 41c5ba29 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20154 from taosdata/fix/toManySeesion

fix: to many seesion
......@@ -50,6 +50,7 @@ extern int32_t tsTagFilterResCacheSize;
// queue & threads
extern int32_t tsNumOfRpcThreads;
extern int32_t tsNumOfRpcSessions;
extern int32_t tsTimeToGetAvailableConn;
extern int32_t tsNumOfCommitThreads;
extern int32_t tsNumOfTaskQueueThreads;
extern int32_t tsNumOfMnodeQueryThreads;
......
......@@ -114,7 +114,7 @@ typedef struct SRpcInit {
int32_t connLimitNum;
int32_t connLimitLock;
int32_t timeToGetConn;
int8_t supportBatch; // 0: no batch, 1. batch
int32_t batchSize;
void *parent;
......
......@@ -144,7 +144,7 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0;
rpcInit.label = "TSC";
rpcInit.numOfThreads = numOfThread;
rpcInit.numOfThreads = tsNumOfRpcThreads;
rpcInit.cfp = processMsgFromServer;
rpcInit.rfp = clientRpcRfp;
rpcInit.sessions = 1024;
......@@ -159,6 +159,12 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
rpcInit.retryMaxTimouet = tsMaxRetryWaitTime;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 5);
connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
void *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) {
tscError("failed to init connection to server");
......@@ -517,7 +523,7 @@ void taos_init_imp(void) {
if (code) {
printf("failed to init memory dbg, error:%s\n", tstrerror(code));
} else {
tsAsyncLog = false;
tsAsyncLog = false;
printf("memory dbg enabled\n");
}
}
......
......@@ -2034,6 +2034,12 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
rpcInit.compressSize = tsCompressMsgSize;
rpcInit.user = "_dnd";
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
clientRpc = rpcOpen(&rpcInit);
if (clientRpc == NULL) {
tscError("failed to init server status client");
......
......@@ -41,7 +41,8 @@ bool tsPrintAuth = false;
// queue & threads
int32_t tsNumOfRpcThreads = 1;
int32_t tsNumOfRpcSessions = 2000;
int32_t tsNumOfRpcSessions = 5000;
int32_t tsTimeToGetAvailableConn = 100000;
int32_t tsNumOfCommitThreads = 2;
int32_t tsNumOfTaskQueueThreads = 4;
int32_t tsNumOfMnodeQueryThreads = 4;
......@@ -326,6 +327,12 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "useAdapter", tsUseAdapter, true) != 0) return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, true) != 0) return -1;
tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 100000);
if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1;
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 10000000);
if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsTimeToGetAvailableConn, 20, 1000000, 0) != 0) return -1;
tsNumOfTaskQueueThreads = tsNumOfCores / 2;
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
if (tsNumOfTaskQueueThreads >= 10) {
......@@ -397,6 +404,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfRpcSessions = TRANGE(tsNumOfRpcSessions, 100, 10000);
if (cfgAddInt32(pCfg, "numOfRpcSessions", tsNumOfRpcSessions, 1, 100000, 0) != 0) return -1;
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000);
if (cfgAddInt32(pCfg, "timeToGetAvailableConn", tsNumOfRpcSessions, 20, 1000000, 0) != 0) return -1;
tsNumOfCommitThreads = tsNumOfCores / 2;
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1;
......@@ -517,6 +527,14 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype;
}
pItem = cfgGetItem(tsCfg, "timeToGetAvailableConn");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsTimeToGetAvailableConn = 1000;
tsTimeToGetAvailableConn = TRANGE(tsTimeToGetAvailableConn, 20, 1000000);
pItem->i32 = tsTimeToGetAvailableConn;
pItem->stype = stype;
}
pItem = cfgGetItem(tsCfg, "numOfCommitThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfCommitThreads = numOfCores / 2;
......@@ -698,6 +716,10 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32;
tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32;
return 0;
}
......@@ -735,6 +757,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32;
tsTimeToGetAvailableConn = cfgGetItem(pCfg, "timeToGetAvailableConn")->i32;
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
......@@ -742,7 +766,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfVnodeFetchThreads = cfgGetItem(pCfg, "numOfVnodeFetchThreads")->i32;
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchTereads")->i32;
tsNumOfSnodeStreamThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
tsNumOfSnodeWriteThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
......
......@@ -93,15 +93,15 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
break;
}
/*
pDnode is null, TD-22618
at trans.c line 91
before this line, dmProcessRpcMsg callback is set
after this line, parent is set
so when dmProcessRpcMsg is called, pDonde is still null.
*/
if (pDnode != NULL){
if(pDnode->status != DND_STAT_RUNNING) {
/*
pDnode is null, TD-22618
at trans.c line 91
before this line, dmProcessRpcMsg callback is set
after this line, parent is set
so when dmProcessRpcMsg is called, pDonde is still null.
*/
if (pDnode != NULL) {
if (pDnode->status != DND_STAT_RUNNING) {
if (pRpc->msgType == TDMT_DND_SERVER_STATUS) {
dmProcessServerStartupStatus(pDnode, pRpc);
return;
......@@ -113,7 +113,7 @@ so when dmProcessRpcMsg is called, pDonde is still null.
}
goto _OVER;
}
}
}
} else {
terrno = TSDB_CODE_APP_IS_STARTING;
goto _OVER;
......@@ -304,6 +304,7 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.connLimitLock = 1;
rpcInit.supportBatch = 1;
rpcInit.batchSize = 8 * 1024;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) {
......
......@@ -606,9 +606,8 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
}
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_SYN_NOT_LEADER ||
code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED ||
code == TSDB_CODE_SYN_RESTORING || code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING ||
code == TSDB_CODE_APP_IS_STOPPING) {
code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED || code == TSDB_CODE_SYN_RESTORING ||
code == TSDB_CODE_MNODE_NOT_FOUND || code == TSDB_CODE_APP_IS_STARTING || code == TSDB_CODE_APP_IS_STOPPING) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
msgType == TDMT_SCH_MERGE_FETCH) {
return false;
......@@ -673,6 +672,12 @@ int32_t udfdOpenClientRpc() {
rpcInit.rfp = udfdRpcRfp;
rpcInit.compressSize = tsCompressMsgSize;
int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
connLimitNum = TMAX(connLimitNum, 10);
connLimitNum = TMIN(connLimitNum, 500);
rpcInit.connLimitNum = connLimitNum;
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
global.clientRpc = rpcOpen(&rpcInit);
if (global.clientRpc == NULL) {
fnError("failed to init dnode rpc client");
......@@ -765,7 +770,7 @@ bool isUdfdUvMsgComplete(SUdfdUvConn *pipe) {
}
void udfdHandleRequest(SUdfdUvConn *conn) {
char *inputBuf = conn->inputBuf;
char *inputBuf = conn->inputBuf;
int32_t inputLen = conn->inputLen;
uv_work_t *work = taosMemoryMalloc(sizeof(uv_work_t));
......@@ -784,7 +789,7 @@ void udfdHandleRequest(SUdfdUvConn *conn) {
void udfdPipeCloseCb(uv_handle_t *pipe) {
SUdfdUvConn *conn = pipe->data;
SUvUdfWork* pWork = conn->pWorkList;
SUvUdfWork *pWork = conn->pWorkList;
while (pWork != NULL) {
pWork->conn = NULL;
pWork = pWork->pWorkNext;
......
......@@ -148,7 +148,8 @@ typedef struct {
int8_t epsetRetryCnt;
int32_t retryCode;
int hThrdIdx;
void* task;
int hThrdIdx;
} STransConnCtx;
#pragma pack(push, 1)
......
......@@ -64,11 +64,11 @@ typedef struct {
void (*destroyFp)(void* ahandle);
bool (*failFastFp)(tmsg_t msgType);
int32_t connLimitNum;
int8_t connLimitLock; // 0: no lock. 1. lock
int8_t supportBatch; // 0: no batch, 1: support batch
int32_t batchSize;
int32_t connLimitNum;
int8_t connLimitLock; // 0: no lock. 1. lock
int8_t supportBatch; // 0: no batch, 1: support batch
int32_t batchSize;
int32_t timeToGetConn;
int index;
void* parent;
void* tcphandle; // returned handle from TCP initialization
......
......@@ -66,6 +66,10 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->destroyFp = pInit->dfp;
pRpc->failFastFp = pInit->ffp;
pRpc->connLimitNum = pInit->connLimitNum;
if (pRpc->connLimitNum == 0) {
pRpc->connLimitNum = 20;
}
pRpc->connLimitLock = pInit->connLimitLock;
pRpc->supportBatch = pInit->supportBatch;
pRpc->batchSize = pInit->batchSize;
......@@ -90,7 +94,10 @@ void* rpcOpen(const SRpcInit* pInit) {
if (pInit->user) {
tstrncpy(pRpc->user, pInit->user, sizeof(pRpc->user));
}
pRpc->timeToGetConn = pInit->timeToGetConn;
if (pRpc->timeToGetConn == 0) {
pRpc->timeToGetConn = 10 * 1000;
}
pRpc->tcphandle =
(*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
......
......@@ -13,9 +13,15 @@
*/
#include "transComm.h"
typedef struct {
int32_t numOfConn;
queue msgQ;
} SMsgList;
typedef struct SConnList {
queue conns;
int32_t size;
queue conns;
int32_t size;
SMsgList* list;
} SConnList;
typedef struct {
......@@ -100,6 +106,7 @@ typedef struct SCliThrd {
TdThreadMutex msgMtx;
SDelayQueue* delayQueue;
SDelayQueue* timeoutQueue;
SDelayQueue* waitConnQueue;
uint64_t nextTimeout; // next timeout
void* pTransInst; //
......@@ -109,7 +116,6 @@ typedef struct SCliThrd {
SCvtAddr cvtAddr;
SHashObj* failFastCache;
SHashObj* connLimitCache;
SHashObj* batchCache;
SCliMsg* stopMsg;
......@@ -134,8 +140,8 @@ typedef struct {
// conn pool
// add expire timeout and capacity limit
static void* createConnPool(int size);
static void* destroyConnPool(void* pool);
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static void* destroyConnPool(SCliThrd* thread);
static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed);
static void addConnToPool(void* pool, SCliConn* conn);
static void doCloseIdleConn(void* param);
......@@ -175,7 +181,8 @@ static void cliSend(SCliConn* pConn);
static void cliSendBatch(SCliConn* pConn);
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port);
static void doFreeTimeoutMsg(void* param);
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg** pMsg);
// cli util func
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
......@@ -193,6 +200,7 @@ static void cliHandleExcept(SCliConn* conn);
static void cliReleaseUnfinishedMsg(SCliConn* conn);
static void cliHandleFastFail(SCliConn* pConn, int status);
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd);
// handle req from app
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
......@@ -333,12 +341,8 @@ bool cliMaySendCachedMsg(SCliConn* conn) {
if (!transQueueEmpty(&conn->cliMsgs)) {
SCliMsg* pCliMsg = NULL;
CONN_GET_NEXT_SENDMSG(conn);
if (pCliMsg == NULL)
return false;
else {
cliSend(conn);
return true;
}
cliSend(conn);
return true;
}
return false;
_RETURN:
......@@ -545,7 +549,8 @@ void* createConnPool(int size) {
// thread local, no lock
return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
}
void* destroyConnPool(void* pool) {
void* destroyConnPool(SCliThrd* pThrd) {
void* pool = pThrd->pool;
SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
while (connList != NULL) {
while (!QUEUE_IS_EMPTY(&connList->conns)) {
......@@ -553,34 +558,130 @@ void* destroyConnPool(void* pool) {
SCliConn* c = QUEUE_DATA(h, SCliConn, q);
cliDestroyConn(c, true);
}
SMsgList* msglist = connList->list;
while (!QUEUE_IS_EMPTY(&msglist->msgQ)) {
queue* h = QUEUE_HEAD(&msglist->msgQ);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task);
pMsg->ctx->task = NULL;
doNotifyApp(pMsg, pThrd);
}
taosMemoryFree(msglist);
connList = taosHashIterate((SHashObj*)pool, connList);
}
taosHashCleanup(pool);
return NULL;
}
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
void* pool = pThrd->pool;
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
STrans* pTranInst = pThrd->pTransInst;
if (plist == NULL) {
SConnList list = {0};
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet(pool, key, strlen(key));
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
QUEUE_INIT(&nList->msgQ);
nList->numOfConn++;
QUEUE_INIT(&plist->conns);
plist->list = nList;
}
if (QUEUE_IS_EMPTY(&plist->conns)) {
if (plist->list->numOfConn >= pTranInst->connLimitNum) {
*exceed = true;
}
return NULL;
}
queue* h = QUEUE_TAIL(&plist->conns);
QUEUE_REMOVE(h);
plist->size -= 1;
SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal;
QUEUE_INIT(&conn->q);
if (conn->task != NULL) {
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
conn->task = NULL;
}
return conn;
}
static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
void* pool = pThrd->pool;
STrans* pTransInst = pThrd->pTransInst;
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
if (plist == NULL) {
SConnList list = {0};
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet((SHashObj*)pool, key, strlen(key));
if (plist == NULL) return NULL;
plist = taosHashGet(pool, key, strlen(key));
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
QUEUE_INIT(&nList->msgQ);
nList->numOfConn++;
QUEUE_INIT(&plist->conns);
plist->list = nList;
}
STraceId* trace = &(*pMsg)->msg.info.traceId;
// no avaliable conn in pool
if (QUEUE_IS_EMPTY(&plist->conns)) {
SMsgList* list = plist->list;
if ((list)->numOfConn >= pTransInst->connLimitNum) {
STraceId* trace = &(*pMsg)->msg.info.traceId;
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = *pMsg;
arg->param2 = pThrd;
(*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));
QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
*pMsg = NULL;
} else {
// send msg in delay queue
if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) {
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = *pMsg;
arg->param2 = pThrd;
(*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label,
TMSG_INFO((*pMsg)->msg.msgType));
QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
queue* h = QUEUE_HEAD(&(list)->msgQ);
QUEUE_REMOVE(h);
SCliMsg* ans = QUEUE_DATA(h, SCliMsg, q);
*pMsg = ans;
trace = &(*pMsg)->msg.info.traceId;
tGTrace("%s msg %s pop from delay queue, start to send", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));
transDQCancel(pThrd->waitConnQueue, ans->ctx->task);
}
list->numOfConn++;
}
return NULL;
}
queue* h = QUEUE_TAIL(&plist->conns);
plist->size -= 1;
queue* h = QUEUE_HEAD(&plist->conns);
QUEUE_REMOVE(h);
SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal;
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
if (conn->task != NULL) {
......@@ -608,18 +709,34 @@ static void addConnToPool(void* pool, SCliConn* conn) {
cliDestroyConnMsgs(conn, false);
conn->status = ConnInPool;
if (conn->list == NULL) {
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip));
} else {
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
}
SConnList* pList = conn->list;
SMsgList* msgList = pList->list;
if (!QUEUE_IS_EMPTY(&msgList->msgQ)) {
queue* h = QUEUE_HEAD(&(msgList)->msgQ);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
transDQCancel(thrd->waitConnQueue, pMsg->ctx->task);
pMsg->ctx->task = NULL;
transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
transQueuePush(&conn->cliMsgs, pMsg);
conn->status = ConnNormal;
cliSend(conn);
return;
}
conn->status = ConnInPool;
QUEUE_PUSH(&conn->list->conns, &conn->q);
conn->list->size += 1;
if (conn->list->size >= 250) {
if (conn->list->size >= 20) {
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn;
arg->param2 = thrd;
......@@ -741,8 +858,20 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
static void cliDestroyConn(SCliConn* conn, bool clear) {
SCliThrd* pThrd = conn->hostThrd;
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
if (conn->list != NULL) {
SConnList* connList = conn->list;
connList->list->numOfConn--;
connList->size--;
} else {
SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip));
connList->list->numOfConn--;
}
conn->list = NULL;
transReleaseExHandle(transGetRefMgt(), conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1;
......@@ -777,9 +906,6 @@ static void cliDestroy(uv_handle_t* handle) {
conn->timer->data = NULL;
conn->timer = NULL;
}
int32_t* oVal = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip));
int32_t nVal = oVal == NULL ? 0 : (*oVal) - 1;
taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nVal, sizeof(nVal));
atomic_sub_fetch_32(&pThrd->connCount, 1);
......@@ -1012,11 +1138,15 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
STrans* pTransInst = pThrd->pTransInst;
SCliBatchList* pList = pBatch->pList;
SCliConn* conn = getConnFromPool(pThrd->pool, pList->ip, pList->port);
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port);
bool exceed = false;
SCliConn* conn = getConnFromPool(pThrd, key, &exceed);
if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, pList->ip, pList->port)) {
tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pBatch->wLen,
pBatch->batchSize);
if (conn == NULL && exceed) {
tError("%s failed to send batch msg, batch size:%d, msgLen: %d, conn limit:%d", pTransInst->label, pBatch->wLen,
pBatch->batchSize, pTransInst->connLimitNum);
cliDestroyBatch(pBatch);
return;
}
......@@ -1176,10 +1306,6 @@ void cliConnCb(uv_connect_t* req, int status) {
return;
}
int32_t* oVal = taosHashGet(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip));
int32_t nVal = oVal == NULL ? 0 : (*oVal) + 1;
taosHashPut(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip), &nVal, sizeof(nVal));
struct sockaddr peername, sockname;
int addrlen = sizeof(peername);
uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen);
......@@ -1197,6 +1323,29 @@ void cliConnCb(uv_connect_t* req, int status) {
}
}
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd) {
STransConnCtx* pCtx = pMsg->ctx;
STrans* pTransInst = pThrd->pTransInst;
STransMsg transMsg = {0};
transMsg.contLen = 0;
transMsg.pCont = NULL;
transMsg.code = TSDB_CODE_RPC_MAX_SESSIONS;
transMsg.msgType = pMsg->msg.msgType + 1;
transMsg.info.ahandle = pMsg->ctx->ahandle;
transMsg.info.traceId = pMsg->msg.info.traceId;
transMsg.info.hasEpSet = false;
if (pCtx->pSem != NULL) {
if (pCtx->pRsp == NULL) {
} else {
memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg));
}
} else {
pTransInst->cfp(pTransInst->parent, &transMsg, NULL);
}
destroyCmsg(pMsg);
}
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) {
pThrd->stopMsg = pMsg;
......@@ -1206,7 +1355,8 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
pThrd->quit = true;
tDebug("cli work thread %p start to quit", pThrd);
destroyCmsg(pMsg);
destroyConnPool(pThrd->pool);
destroyConnPool(pThrd);
uv_walk(pThrd->loop, cliWalkCb, NULL);
}
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
......@@ -1239,11 +1389,11 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
destroyCmsg(pMsg);
}
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
STransConnCtx* pCtx = pMsg->ctx;
SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) {
STransConnCtx* pCtx = (*pMsg)->ctx;
SCliConn* conn = NULL;
int64_t refId = (int64_t)(pMsg->msg.info.handle);
int64_t refId = (int64_t)((*pMsg)->msg.info.handle);
if (refId != 0) {
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh == NULL) {
......@@ -1253,7 +1403,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
} else {
conn = exh->handle;
if (conn == NULL) {
conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
conn = getConnFromPool2(pThrd, addr, pMsg);
if (conn != NULL) specifyConnRef(conn, true, refId);
}
transReleaseExHandle(transGetRefMgt(), refId);
......@@ -1261,7 +1411,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
return conn;
};
conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet));
conn = getConnFromPool2(pThrd, addr, pMsg);
if (conn != NULL) {
tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
} else {
......@@ -1319,57 +1469,34 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
return;
}
static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port) {
STrans* pTransInst = pThrd->pTransInst;
// STransConnCtx* pCtx = pMsg->ctx;
// char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
// int32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key));
if (val == NULL) return 0;
static void doFreeTimeoutMsg(void* param) {
STaskArg* arg = param;
SCliMsg* pMsg = arg->param1;
SCliThrd* pThrd = arg->param2;
STrans* pTransInst = pThrd->pTransInst;
if (*val >= pTransInst->connLimitNum) {
return -1;
}
return 0;
QUEUE_REMOVE(&pMsg->q);
STraceId* trace = &pMsg->msg.info.traceId;
tGTrace("%s msg %s cannot get available conn after timeout", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
doNotifyApp(pMsg, pThrd);
taosMemoryFree(arg);
}
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
STrans* pTransInst = pThrd->pTransInst;
STransConnCtx* pCtx = pMsg->ctx;
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
STraceId* trace = &pMsg->msg.info.traceId;
char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet);
uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
STrans* pTransInst = pThrd->pTransInst;
if (!EPSET_IS_VALID(&pCtx->epSet)) {
tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) {
destroyCmsg(pMsg);
return;
}
if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key));
if (item != NULL) {
int32_t elapse = (int32_t)(taosGetTimestampMs() - item->timestamp);
if (item->count >= pTransInst->failFastThreshold && (elapse >= 0 && elapse <= pTransInst->failFastInterval)) {
tGTrace("%s, msg %s cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label,
TMSG_INFO(pMsg->msg.msgType), ip, port, item->count, elapse);
destroyCmsg(pMsg);
return;
}
}
}
char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet);
uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet);
char addr[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port);
bool ignore = false;
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore);
SCliConn* conn = cliGetConn(&pMsg, pThrd, &ignore, addr);
if (ignore == true) {
// persist conn already release by server
STransMsg resp;
......@@ -1380,16 +1507,13 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
destroyCmsg(pMsg);
return;
}
if (conn == NULL && REQUEST_NO_RESP(&pMsg->msg) && 0 != cliPreCheckSessionLimit(pThrd, ip, port)) {
tGTrace("%s, msg %s cancel to send, reason: %s", pTransInst->label, TMSG_INFO(pMsg->msg.msgType),
tstrerror(TSDB_CODE_RPC_MAX_SESSIONS));
destroyCmsg(pMsg);
if (conn == NULL && pMsg == NULL) {
return;
}
STraceId* trace = &pMsg->msg.info.traceId;
if (conn != NULL) {
transCtxMerge(&conn->ctx, &pCtx->appCtx);
transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
transQueuePush(&conn->cliMsgs, pMsg);
cliSend(conn);
} else {
......@@ -1398,15 +1522,10 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
int64_t refId = (int64_t)pMsg->msg.info.handle;
if (refId != 0) specifyConnRef(conn, true, refId);
transCtxMerge(&conn->ctx, &pCtx->appCtx);
transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
transQueuePush(&conn->cliMsgs, pMsg);
char key[TSDB_FQDN_LEN + 64] = {0};
char* fqdn = EPSET_GET_INUSE_IP(&pCtx->epSet);
uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
CONN_CONSTRUCT_HASH_KEY(key, fqdn, port);
conn->ip = taosStrdup(key);
conn->ip = taosStrdup(addr);
uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn);
if (ipaddr == 0xffffffff) {
......@@ -1830,14 +1949,14 @@ static SCliThrd* createThrdObj(void* trans) {
transDQCreate(pThrd->loop, &pThrd->timeoutQueue);
transDQCreate(pThrd->loop, &pThrd->waitConnQueue);
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
pThrd->pTransInst = trans;
pThrd->destroyAhandleFp = pTransInst->destroyFp;
pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
pTransInst->connLimitLock == 0 ? HASH_NO_LOCK : HASH_ENTRY_LOCK);
pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
......@@ -1857,6 +1976,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);
transDQDestroy(pThrd->timeoutQueue, NULL);
transDQDestroy(pThrd->waitConnQueue, NULL);
tDebug("thread destroy %" PRId64, pThrd->pid);
for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
......@@ -1868,7 +1988,6 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosMemoryFree(pThrd->loop);
taosHashCleanup(pThrd->fqdn2ipCache);
taosHashCleanup(pThrd->failFastCache);
taosHashCleanup(pThrd->connLimitCache);
void** pIter = taosHashIterate(pThrd->batchCache, NULL);
while (pIter != NULL) {
......
......@@ -21,7 +21,7 @@ static void shellWorkAsClient() {
SRpcInit rpcInit = {0};
SEpSet epSet = {.inUse = 0, .numOfEps = 1};
SRpcMsg rpcRsp = {0};
void * clientRpc = NULL;
void *clientRpc = NULL;
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)("_pwd"), strlen("_pwd"), pass);
......@@ -31,6 +31,7 @@ static void shellWorkAsClient() {
rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = "_dnd";
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
clientRpc = rpcOpen(&rpcInit);
if (clientRpc == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册