提交 f22d0731 编写于 作者: dengyihao's avatar dengyihao

handle too many session

上级 b8dfc714
...@@ -112,7 +112,8 @@ typedef struct SRpcInit { ...@@ -112,7 +112,8 @@ typedef struct SRpcInit {
// fail fast fp // fail fast fp
RpcFFfp ffp; RpcFFfp ffp;
int32_t connLimit; int32_t connLimitNum;
int32_t connLimitLock;
void *parent; void *parent;
} SRpcInit; } SRpcInit;
......
...@@ -284,7 +284,8 @@ int32_t dmInitClient(SDnode *pDnode) { ...@@ -284,7 +284,8 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.failFastThreshold = 3; // failed threshold rpcInit.failFastThreshold = 3; // failed threshold
rpcInit.ffp = dmFailFastFp; rpcInit.ffp = dmFailFastFp;
rpcInit.connLimit = 3000; rpcInit.connLimitNum = 3000;
rpcInit.connLimitLock = 1;
pTrans->clientRpc = rpcOpen(&rpcInit); pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) { if (pTrans->clientRpc == NULL) {
......
...@@ -64,7 +64,9 @@ typedef struct { ...@@ -64,7 +64,9 @@ typedef struct {
void (*destroyFp)(void* ahandle); void (*destroyFp)(void* ahandle);
bool (*failFastFp)(tmsg_t msgType); bool (*failFastFp)(tmsg_t msgType);
int32_t connLimit; int32_t connLimitNum;
int8_t connLimitLock; // 0: no lock. 1. lock
int index; int index;
void* parent; void* parent;
void* tcphandle; // returned handle from TCP initialization void* tcphandle; // returned handle from TCP initialization
......
...@@ -67,7 +67,8 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -67,7 +67,8 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->startTimer = pInit->tfp; pRpc->startTimer = pInit->tfp;
pRpc->destroyFp = pInit->dfp; pRpc->destroyFp = pInit->dfp;
pRpc->failFastFp = pInit->ffp; pRpc->failFastFp = pInit->ffp;
pRpc->connLimit = pInit->connLimit; pRpc->connLimitNum = pInit->connLimitNum;
pRpc->connLimitLock = pInit->connLimitLock;
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
if (pRpc->numOfThreads <= 0) { if (pRpc->numOfThreads <= 0) {
......
...@@ -85,7 +85,7 @@ typedef struct SCliThrd { ...@@ -85,7 +85,7 @@ typedef struct SCliThrd {
SCvtAddr cvtAddr; SCvtAddr cvtAddr;
SHashObj* failFastCache; SHashObj* failFastCache;
SHashObj* connLimit; SHashObj* connLimitCache;
SCliMsg* stopMsg; SCliMsg* stopMsg;
...@@ -750,9 +750,9 @@ static void cliDestroy(uv_handle_t* handle) { ...@@ -750,9 +750,9 @@ static void cliDestroy(uv_handle_t* handle) {
transReqQueueClear(&conn->wreqQueue); transReqQueueClear(&conn->wreqQueue);
transDestroyBuffer(&conn->readBuf); transDestroyBuffer(&conn->readBuf);
int32_t* oVal = taosHashGet(pThrd->connLimit, conn->ip, strlen(conn->ip)); int32_t* oVal = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip));
int32_t nVal = oVal == NULL ? 0 : (*oVal) - 1; int32_t nVal = oVal == NULL ? 0 : (*oVal) - 1;
taosHashPut(pThrd->connLimit, conn->ip, strlen(conn->ip), &nVal, sizeof(nVal)); taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nVal, sizeof(nVal));
taosMemoryFree(conn); taosMemoryFree(conn);
} }
...@@ -930,9 +930,9 @@ void cliConnCb(uv_connect_t* req, int status) { ...@@ -930,9 +930,9 @@ void cliConnCb(uv_connect_t* req, int status) {
return; return;
} }
int32_t* oVal = taosHashGet(pThrd->connLimit, pConn->ip, strlen(pConn->ip)); int32_t* oVal = taosHashGet(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip));
int32_t nVal = oVal == NULL ? 0 : (*oVal) + 1; int32_t nVal = oVal == NULL ? 0 : (*oVal) + 1;
taosHashPut(pThrd->connLimit, pConn->ip, strlen(pConn->ip), &nVal, sizeof(nVal)); taosHashPut(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip), &nVal, sizeof(nVal));
struct sockaddr peername, sockname; struct sockaddr peername, sockname;
int addrlen = sizeof(peername); int addrlen = sizeof(peername);
...@@ -1080,10 +1080,10 @@ static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, SCliMsg* pMsg) { ...@@ -1080,10 +1080,10 @@ static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, SCliMsg* pMsg) {
char key[TSDB_FQDN_LEN + 64] = {0}; char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port); CONN_CONSTRUCT_HASH_KEY(key, ip, port);
int32_t* val = taosHashGet(pThrd->connLimit, key, strlen(key)); int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key));
if (val == NULL) return 0; if (val == NULL) return 0;
if (*val >= pTransInst->connLimit) { if (*val >= pTransInst->connLimitNum) {
return -1; return -1;
} }
return 0; return 0;
...@@ -1441,7 +1441,8 @@ static SCliThrd* createThrdObj(void* trans) { ...@@ -1441,7 +1441,8 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->destroyAhandleFp = pTransInst->destroyFp;
pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->connLimit = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
pTransInst->connLimitLock == 0 ? HASH_NO_LOCK : HASH_ENTRY_LOCK);
pThrd->quit = false; pThrd->quit = false;
return pThrd; return pThrd;
...@@ -1470,7 +1471,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { ...@@ -1470,7 +1471,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosHashCleanup(pThrd->fqdn2ipCache); taosHashCleanup(pThrd->fqdn2ipCache);
taosHashCleanup(pThrd->failFastCache); taosHashCleanup(pThrd->failFastCache);
taosHashCleanup(pThrd->connLimit); taosHashCleanup(pThrd->connLimitCache);
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
} }
...@@ -1894,6 +1895,19 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran ...@@ -1894,6 +1895,19 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_BROKEN_LINK; return TSDB_CODE_RPC_BROKEN_LINK;
} }
if (pTransInst->connLimitNum > 0 && REQUEST_NO_RESP(pReq)) {
char key[TSDB_FQDN_LEN + 64] = {0};
char* ip = EPSET_GET_INUSE_IP((SEpSet*)pEpSet);
uint16_t port = EPSET_GET_INUSE_PORT((SEpSet*)pEpSet);
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key));
if (val != NULL && *val >= pTransInst->connLimitNum) {
transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return TSDB_CODE_RPC_BROKEN_LINK;
}
}
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64()); TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册