提交 3bf28e18 编写于 作者: dengyihao's avatar dengyihao

fix: limit session num

上级 be17aa82
...@@ -23,7 +23,6 @@ typedef struct SConnList { ...@@ -23,7 +23,6 @@ typedef struct SConnList {
queue conns; queue conns;
int32_t size; int32_t size;
SMsgList* list; SMsgList* list;
void* pThrd;
} SConnList; } SConnList;
typedef struct { typedef struct {
...@@ -142,7 +141,7 @@ typedef struct { ...@@ -142,7 +141,7 @@ typedef struct {
// conn pool // conn pool
// add expire timeout and capacity limit // add expire timeout and capacity limit
static void* createConnPool(int size); static void* createConnPool(int size);
static void* destroyConnPool(void* pool); static void* destroyConnPool(SCliThrd* thread);
static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed); static SCliConn* getConnFromPool(SCliThrd* thread, char* key, bool* exceed);
static void addConnToPool(void* pool, SCliConn* conn); static void addConnToPool(void* pool, SCliConn* conn);
static void doCloseIdleConn(void* param); static void doCloseIdleConn(void* param);
...@@ -547,9 +546,9 @@ void* createConnPool(int size) { ...@@ -547,9 +546,9 @@ void* createConnPool(int size) {
// thread local, no lock // thread local, no lock
return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
} }
void* destroyConnPool(void* pool) { void* destroyConnPool(SCliThrd* pThrd) {
void* pool = pThrd->pool;
SConnList* connList = taosHashIterate((SHashObj*)pool, NULL); SConnList* connList = taosHashIterate((SHashObj*)pool, NULL);
SCliThrd* pThrd = connList->pThrd;
while (connList != NULL) { while (connList != NULL) {
while (!QUEUE_IS_EMPTY(&connList->conns)) { while (!QUEUE_IS_EMPTY(&connList->conns)) {
queue* h = QUEUE_HEAD(&connList->conns); queue* h = QUEUE_HEAD(&connList->conns);
...@@ -582,29 +581,31 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { ...@@ -582,29 +581,31 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
STrans* pTranInst = pThrd->pTransInst; STrans* pTranInst = pThrd->pTransInst;
if (plist == NULL) { if (plist == NULL) {
SConnList list = {0};
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet(pool, key, strlen(key));
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
QUEUE_INIT(&nList->msgQ); QUEUE_INIT(&nList->msgQ);
nList->numOfConn++; nList->numOfConn++;
SConnList list = {0}; QUEUE_INIT(&plist->conns);
QUEUE_INIT(&list.conns); plist->list = nList;
list.list = nList;
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
return NULL;
} }
SMsgList* msglist = plist->list; if (QUEUE_IS_EMPTY(&plist->conns)) {
if (QUEUE_IS_EMPTY(&plist->conns) && msglist->numOfConn >= pTranInst->connLimitNum) { if (plist->list->numOfConn >= pTranInst->connLimitNum) {
*exceed = true; *exceed = true;
}
return NULL; return NULL;
} }
plist->size -= 1;
queue* h = QUEUE_HEAD(&plist->conns); queue* h = QUEUE_HEAD(&plist->conns);
QUEUE_REMOVE(h);
plist->size -= 1;
SCliConn* conn = QUEUE_DATA(h, SCliConn, q); SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal; conn->status = ConnNormal;
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q); QUEUE_INIT(&conn->q);
if (conn->task != NULL) { if (conn->task != NULL) {
...@@ -619,23 +620,21 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { ...@@ -619,23 +620,21 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
STrans* pTransInst = pThrd->pTransInst; STrans* pTransInst = pThrd->pTransInst;
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
if (plist == NULL) { if (plist == NULL) {
SConnList list = {0};
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet(pool, key, strlen(key));
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
QUEUE_INIT(&nList->msgQ); QUEUE_INIT(&nList->msgQ);
nList->numOfConn++; nList->numOfConn++;
SConnList list = {0}; QUEUE_INIT(&plist->conns);
QUEUE_INIT(&list.conns); plist->list = nList;
list.list = nList;
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet((SHashObj*)pool, key, strlen(key));
return NULL;
} }
SMsgList* list = plist->list;
// no avaliable conn in pool // no avaliable conn in pool
if (QUEUE_IS_EMPTY(&plist->conns)) { if (QUEUE_IS_EMPTY(&plist->conns)) {
SMsgList* list = plist->list;
if ((list)->numOfConn >= pTransInst->connLimitNum) { if ((list)->numOfConn >= pTransInst->connLimitNum) {
STraceId* trace = &(*pMsg)->msg.info.traceId; STraceId* trace = &(*pMsg)->msg.info.traceId;
...@@ -669,11 +668,12 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { ...@@ -669,11 +668,12 @@ static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
return NULL; return NULL;
} }
plist->size -= 1;
queue* h = QUEUE_HEAD(&plist->conns); queue* h = QUEUE_HEAD(&plist->conns);
plist->size -= 1;
QUEUE_REMOVE(h);
SCliConn* conn = QUEUE_DATA(h, SCliConn, q); SCliConn* conn = QUEUE_DATA(h, SCliConn, q);
conn->status = ConnNormal; conn->status = ConnNormal;
QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q); QUEUE_INIT(&conn->q);
if (conn->task != NULL) { if (conn->task != NULL) {
...@@ -686,6 +686,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { ...@@ -686,6 +686,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->status == ConnInPool) { if (conn->status == ConnInPool) {
return; return;
} }
tError("add conn to pool");
allocConnRef(conn, true); allocConnRef(conn, true);
SCliThrd* thrd = conn->hostThrd; SCliThrd* thrd = conn->hostThrd;
...@@ -1347,7 +1348,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -1347,7 +1348,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
tDebug("cli work thread %p start to quit", pThrd); tDebug("cli work thread %p start to quit", pThrd);
destroyCmsg(pMsg); destroyCmsg(pMsg);
destroyConnPool(pThrd->pool); destroyConnPool(pThrd);
uv_walk(pThrd->loop, cliWalkCb, NULL); uv_walk(pThrd->loop, cliWalkCb, NULL);
} }
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册