提交 45a012bd 编写于 作者: dengyihao's avatar dengyihao

fix: limit session num

上级 28db4c8a
......@@ -14,9 +14,16 @@
#include "transComm.h"
typedef struct {
int32_t numOfConn;
queue msgQ;
} SMsgList;
typedef struct SConnList {
queue conns;
int32_t size;
queue conns;
int32_t size;
SMsgList* list;
void* pThrd;
} SConnList;
typedef struct {
......@@ -76,10 +83,6 @@ typedef struct SCliConn {
} SCliConn;
typedef struct {
int32_t numOfConn;
queue msgQ;
} SMsgList;
typedef struct SCliMsg {
STransConnCtx* ctx;
STransMsg msg;
......@@ -115,7 +118,6 @@ typedef struct SCliThrd {
SCvtAddr cvtAddr;
SHashObj* failFastCache;
SHashObj* connLimitCache;
SHashObj* batchCache;
SCliMsg* stopMsg;
......@@ -141,7 +143,7 @@ typedef struct {
// add expire timeout and capacity limit
static void* createConnPool(int size);
static void* destroyConnPool(void* pool);
static SCliConn* getConnFromPool(void* pool, char* addr);
static SCliConn* getConnFromPool(SCliThrd* thread, char* key);
static void addConnToPool(void* pool, SCliConn* conn);
static void doCloseIdleConn(void* param);
......@@ -181,8 +183,8 @@ static void cliSend(SCliConn* pConn);
static void cliSendBatch(SCliConn* pConn);
static void cliDestroyConnMsgs(SCliConn* conn, bool destroy);
static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* addr);
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg* pMsg);
static void doFreeTimeoutMsg(void* param);
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg** pMsg);
// cli util func
static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx);
......@@ -200,6 +202,7 @@ static void cliHandleExcept(SCliConn* conn);
static void cliReleaseUnfinishedMsg(SCliConn* conn);
static void cliHandleFastFail(SCliConn* pConn, int status);
static void doNotifyApp(SCliMsg* pMsg, SCliThrd* pThrd);
// handle req from app
static void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd);
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd);
......@@ -546,20 +549,38 @@ void* createConnPool(int size) {
}
void* destroyConnPool(void* pool) {
SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
SCliThrd* pThrd = connList->pThrd;
while (connList != NULL) {
while (!QUEUE_IS_EMPTY(&connList->conns)) {
queue* h = QUEUE_HEAD(&connList->conns);
SCliConn* c = QUEUE_DATA(h, SCliConn, q);
cliDestroyConn(c, true);
}
SMsgList* msglist = connList->list;
while (!QUEUE_IS_EMPTY(&msglist->msgQ)) {
queue* h = QUEUE_HEAD(&msglist->msgQ);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task);
pMsg->ctx->task = NULL;
doNotifyApp(pMsg, pThrd);
}
taosMemoryFree(msglist);
connList = taosHashIterate((SHashObj*)pool, connList);
}
taosHashCleanup(pool);
return NULL;
}
static SCliConn* getConnFromPool(void* pool, char* key) {
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key) {
void* pool = pThrd->pool;
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
STrans* pTranInst = pThrd->pTransInst;
if (plist == NULL) {
SConnList list = {0};
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
......@@ -568,7 +589,76 @@ static SCliConn* getConnFromPool(void* pool, char* key) {
QUEUE_INIT(&plist->conns);
}
SMsgList* msglist = plist->list;
if (QUEUE_IS_EMPTY(&plist->conns) && msglist->numOfConn >= pTranInst->connLimitNum) {
return NULL;
}
plist->size -= 1;
queue* h = QUEUE_HEAD(&plist->conns);
SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal;
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
if (conn->task != NULL) {
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
conn->task = NULL;
}
return conn;
}
static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
void* pool = pThrd->pool;
STrans* pTransInst = pThrd->pTransInst;
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
if (plist == NULL) {
SConnList list = {0};
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
nList->numOfConn++;
QUEUE_INIT(&nList->msgQ);
list.list = nList;
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet((SHashObj*)pool, key, strlen(key));
QUEUE_INIT(&plist->conns);
}
SMsgList* list = plist->list;
// no avaliable conn in pool
if (QUEUE_IS_EMPTY(&plist->conns)) {
if ((list)->numOfConn >= pTransInst->connLimitNum) {
STraceId* trace = &(*pMsg)->msg.info.traceId;
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = *pMsg;
arg->param2 = pThrd;
(*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO((*pMsg)->msg.msgType));
QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
*pMsg = NULL;
} else {
// send msg in delay queue
if (!(QUEUE_IS_EMPTY(&(list)->msgQ))) {
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = *pMsg;
arg->param2 = pThrd;
(*pMsg)->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
QUEUE_PUSH(&(list)->msgQ, &(*pMsg)->q);
queue* h = QUEUE_HEAD(&(list)->msgQ);
QUEUE_REMOVE(h);
SCliMsg* ans = QUEUE_DATA(h, SCliMsg, q);
*pMsg = ans;
transDQCancel(pThrd->waitConnQueue, ans->ctx->task);
}
list->numOfConn++;
}
return NULL;
}
......@@ -604,29 +694,30 @@ static void addConnToPool(void* pool, SCliConn* conn) {
cliDestroyConnMsgs(conn, false);
conn->status = ConnInPool;
SMsgList** msglist = taosHashGet(thrd->connLimitCache, conn->ip, strlen(conn->ip));
if (msglist != NULL && *msglist != NULL) {
if (!QUEUE_IS_EMPTY(&(*msglist)->msgQ)) {
queue* h = QUEUE_HEAD(&(*msglist)->msgQ);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
conn->status = ConnNormal;
transDQCancel(thrd->waitConnQueue, pMsg->ctx->task);
transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
transQueuePush(&conn->cliMsgs, pMsg);
cliSend(conn);
return;
}
}
if (conn->list == NULL) {
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip));
} else {
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
}
SConnList* pList = conn->list;
SMsgList* msgList = pList->list;
if (!QUEUE_IS_EMPTY(&msgList->msgQ)) {
queue* h = QUEUE_HEAD(&(msgList)->msgQ);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
transDQCancel(thrd->waitConnQueue, pMsg->ctx->task);
pMsg->ctx->task = NULL;
transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
transQueuePush(&conn->cliMsgs, pMsg);
conn->status = ConnNormal;
cliSend(conn);
return;
}
conn->status = ConnInPool;
QUEUE_PUSH(&conn->list->conns, &conn->q);
conn->list->size += 1;
......@@ -752,8 +843,19 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) {
static void cliDestroyConn(SCliConn* conn, bool clear) {
SCliThrd* pThrd = conn->hostThrd;
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q);
if (conn->list != NULL) {
SConnList* connList = conn->list;
connList->list->numOfConn--;
} else {
SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip));
connList->list->numOfConn--;
}
conn->list = NULL;
transReleaseExHandle(transGetRefMgt(), conn->refId);
transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1;
......@@ -788,10 +890,6 @@ static void cliDestroy(uv_handle_t* handle) {
conn->timer->data = NULL;
conn->timer = NULL;
}
SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip));
if (list != NULL && *list != NULL) {
(*list)->numOfConn--;
}
atomic_sub_fetch_32(&pThrd->connCount, 1);
......@@ -1027,9 +1125,9 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
char key[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port);
SCliConn* conn = getConnFromPool(pThrd->pool, key);
SCliConn* conn = getConnFromPool(pThrd, key);
if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, key)) {
if (conn == NULL) {
tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pBatch->wLen,
pBatch->batchSize);
cliDestroyBatch(pBatch);
......@@ -1085,16 +1183,6 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) {
cliHandleFastFail(conn, -1);
return;
}
SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip));
if (list == NULL || *list == NULL) {
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
nList->numOfConn++;
QUEUE_INIT(&nList->msgQ);
taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nList, sizeof(void*));
} else {
(*list)->numOfConn++;
}
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
return;
}
......@@ -1246,20 +1334,6 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
pThrd->stopMsg = pMsg;
return;
}
void** pIter = taosHashIterate(pThrd->connLimitCache, NULL);
while (pIter != NULL) {
SMsgList* list = (SMsgList*)(*pIter);
while (!QUEUE_IS_EMPTY(&list->msgQ)) {
queue* h = QUEUE_HEAD(&list->msgQ);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
transDQCancel(pThrd->waitConnQueue, pMsg->ctx->task);
doNotifyApp(pMsg, pThrd);
}
pIter = (void**)taosHashIterate(pThrd->connLimitCache, pIter);
}
pThrd->stopMsg = NULL;
pThrd->quit = true;
tDebug("cli work thread %p start to quit", pThrd);
......@@ -1298,11 +1372,11 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) {
destroyCmsg(pMsg);
}
SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore, char* addr) {
STransConnCtx* pCtx = pMsg->ctx;
SCliConn* cliGetConn(SCliMsg** pMsg, SCliThrd* pThrd, bool* ignore, char* addr) {
STransConnCtx* pCtx = (*pMsg)->ctx;
SCliConn* conn = NULL;
int64_t refId = (int64_t)(pMsg->msg.info.handle);
int64_t refId = (int64_t)((*pMsg)->msg.info.handle);
if (refId != 0) {
SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh == NULL) {
......@@ -1312,7 +1386,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore, char* addr) {
} else {
conn = exh->handle;
if (conn == NULL) {
conn = getConnFromPool(pThrd->pool, addr);
conn = getConnFromPool2(pThrd, addr, pMsg);
if (conn != NULL) specifyConnRef(conn, true, refId);
}
transReleaseExHandle(transGetRefMgt(), refId);
......@@ -1320,7 +1394,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore, char* addr) {
return conn;
};
conn = getConnFromPool(pThrd->pool, addr);
conn = getConnFromPool2(pThrd, addr, pMsg);
if (conn != NULL) {
tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool);
} else {
......@@ -1378,19 +1452,6 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) {
return;
}
static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* addr) {
STrans* pTransInst = pThrd->pTransInst;
SMsgList** list = taosHashGet(pThrd->connLimitCache, addr, strlen(addr));
if (list == NULL || *list == NULL) {
return 0;
}
if ((*list)->numOfConn >= pTransInst->connLimitNum) {
return -1;
}
return 0;
}
static void doFreeTimeoutMsg(void* param) {
STaskArg* arg = param;
SCliMsg* pMsg = arg->param1;
......@@ -1403,48 +1464,24 @@ static void doFreeTimeoutMsg(void* param) {
doNotifyApp(pMsg, pThrd);
taosMemoryFree(arg);
}
static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg* pMsg) {
STrans* pTransInst = pThrd->pTransInst;
SMsgList** list = taosHashGet(pThrd->connLimitCache, addr, strlen(addr));
if (list == NULL || *list == NULL) {
return 0;
}
if ((*list)->numOfConn >= pTransInst->connLimitNum) {
STraceId* trace = &pMsg->msg.info.traceId;
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg;
arg->param2 = pThrd;
pMsg->ctx->task = transDQSched(pThrd->waitConnQueue, doFreeTimeoutMsg, arg, pTransInst->timeToGetConn);
tGTrace("%s msg %s delay to send, wait for avaiable connect", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
QUEUE_PUSH(&(*list)->msgQ, &pMsg->q);
return -1;
}
return 0;
}
void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
STrans* pTransInst = pThrd->pTransInst;
STransConnCtx* pCtx = pMsg->ctx;
STraceId* trace = &pMsg->msg.info.traceId;
STrans* pTransInst = pThrd->pTransInst;
STraceId* trace = &pMsg->msg.info.traceId;
cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr);
if (!EPSET_IS_VALID(&pCtx->epSet)) {
cliMayCvtFqdnToIp(&pMsg->ctx->epSet, &pThrd->cvtAddr);
if (!EPSET_IS_VALID(&pMsg->ctx->epSet)) {
tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType));
destroyCmsg(pMsg);
return;
}
char* fqdn = EPSET_GET_INUSE_IP(&pCtx->epSet);
uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet);
char* fqdn = EPSET_GET_INUSE_IP(&pMsg->ctx->epSet);
uint16_t port = EPSET_GET_INUSE_PORT(&pMsg->ctx->epSet);
char addr[TSDB_FQDN_LEN + 64] = {0};
CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port);
bool ignore = false;
SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore, addr);
SCliConn* conn = cliGetConn(&pMsg, pThrd, &ignore, addr);
if (ignore == true) {
// persist conn already release by server
STransMsg resp;
......@@ -1455,12 +1492,12 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
destroyCmsg(pMsg);
return;
}
if (conn == NULL && cliPreCheckSessionLimitForMsg(pThrd, addr, pMsg) != 0) {
if (conn == NULL && pMsg == NULL) {
return;
}
if (conn != NULL) {
transCtxMerge(&conn->ctx, &pCtx->appCtx);
transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
transQueuePush(&conn->cliMsgs, pMsg);
cliSend(conn);
} else {
......@@ -1469,7 +1506,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
int64_t refId = (int64_t)pMsg->msg.info.handle;
if (refId != 0) specifyConnRef(conn, true, refId);
transCtxMerge(&conn->ctx, &pCtx->appCtx);
transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx);
transQueuePush(&conn->cliMsgs, pMsg);
conn->ip = strdup(addr);
......@@ -1521,17 +1558,6 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
cliHandleFastFail(conn, ret);
return;
}
SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip));
if (list == NULL || *list == NULL) {
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
nList->numOfConn++;
QUEUE_INIT(&nList->msgQ);
taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nList, sizeof(void*));
} else {
(*list)->numOfConn++;
}
uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0);
}
tGTrace("%s conn %p ready", pTransInst->label, conn);
......@@ -1914,7 +1940,6 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd->destroyAhandleFp = pTransInst->destroyFp;
pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
......@@ -1964,27 +1989,6 @@ static void destroyThrdObj(SCliThrd* pThrd) {
pIter = (void**)taosHashIterate(pThrd->batchCache, pIter);
}
taosHashCleanup(pThrd->batchCache);
pIter = taosHashIterate(pThrd->connLimitCache, NULL);
while (pIter != NULL) {
SMsgList* list = (SMsgList*)(*pIter);
while (!QUEUE_IS_EMPTY(&list->msgQ)) {
queue* h = QUEUE_HEAD(&list->msgQ);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
pThrd->destroyAhandleFp(pMsg->ctx->ahandle);
}
destroyCmsg(pMsg);
}
taosMemoryFree(list);
pIter = (void**)taosHashIterate(pThrd->connLimitCache, pIter);
}
taosHashCleanup(pThrd->connLimitCache);
taosMemoryFree(pThrd);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册