diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 5d6751a260ba5ed0fe1882e4e1a0f0c3c6f8cd22..de5f9c26e0946d343c0bc1e0752783e5f155de0b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -76,6 +76,10 @@ typedef struct SCliConn { } SCliConn; +typedef struct { + int32_t numOfConn; + queue msgQ; +} SMsgList; typedef struct SCliMsg { STransConnCtx* ctx; STransMsg msg; @@ -136,7 +140,7 @@ typedef struct { // add expire timeout and capacity limit static void* createConnPool(int size); static void* destroyConnPool(void* pool); -static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); +static SCliConn* getConnFromPool(void* pool, char* addr); static void addConnToPool(void* pool, SCliConn* conn); static void doCloseIdleConn(void* param); @@ -176,7 +180,8 @@ static void cliSend(SCliConn* pConn); static void cliSendBatch(SCliConn* pConn); static void cliDestroyConnMsgs(SCliConn* conn, bool destroy); -static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port); +static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* addr); +static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg* pMsg); // cli util func static FORCE_INLINE bool cliIsEpsetUpdated(int32_t code, STransConnCtx* pCtx); @@ -556,10 +561,7 @@ void* destroyConnPool(void* pool) { return NULL; } -static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, ip, port); - +static SCliConn* getConnFromPool(void* pool, char* key) { SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); if (plist == NULL) { SConnList list = {0}; @@ -607,6 +609,20 @@ static void addConnToPool(void* pool, SCliConn* conn) { conn->status = ConnInPool; + SMsgList** msglist = taosHashGet(thrd->connLimitCache, conn->ip, strlen(conn->ip)); + if (msglist != NULL && *msglist != NULL) { + if (!QUEUE_IS_EMPTY(&(*msglist)->msgQ)) { + queue* h = QUEUE_HEAD(&(*msglist)->msgQ); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + transCtxMerge(&conn->ctx, &pMsg->ctx->appCtx); + transQueuePush(&conn->cliMsgs, pMsg); + cliSend(conn); + return; + } + } + if (conn->list == NULL) { tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip)); @@ -774,9 +790,10 @@ static void cliDestroy(uv_handle_t* handle) { conn->timer->data = NULL; conn->timer = NULL; } - int32_t* oVal = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip)); - int32_t nVal = oVal == NULL ? 0 : (*oVal) - 1; - taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nVal, sizeof(nVal)); + SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip)); + if (list != NULL && *list != NULL) { + (*list)->numOfConn--; + } atomic_sub_fetch_32(&pThrd->connCount, 1); @@ -1009,9 +1026,12 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { STrans* pTransInst = pThrd->pTransInst; SCliBatchList* pList = pBatch->pList; - SCliConn* conn = getConnFromPool(pThrd->pool, pList->ip, pList->port); + char key[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, pList->ip, pList->port); + + SCliConn* conn = getConnFromPool(pThrd->pool, key); - if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, pList->ip, pList->port)) { + if (conn == NULL && 0 != cliPreCheckSessionLimit(pThrd, key)) { tError("%s failed to send batch msg, batch size:%d, msgLen: %d", pTransInst->label, pBatch->wLen, pBatch->batchSize); cliDestroyBatch(pBatch); @@ -1067,6 +1087,14 @@ static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd) { cliHandleFastFail(conn, -1); return; } + + SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip)); + if (list == NULL || *list == NULL) { + SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); + nList->numOfConn++; + QUEUE_INIT(&nList->msgQ); + taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nList, sizeof(void*)); + } uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); return; } @@ -1173,10 +1201,6 @@ void cliConnCb(uv_connect_t* req, int status) { return; } - int32_t* oVal = taosHashGet(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip)); - int32_t nVal = oVal == NULL ? 0 : (*oVal) + 1; - taosHashPut(pThrd->connLimitCache, pConn->ip, strlen(pConn->ip), &nVal, sizeof(nVal)); - struct sockaddr peername, sockname; int addrlen = sizeof(peername); uv_tcp_getpeername((uv_tcp_t*)pConn->stream, &peername, &addrlen); @@ -1236,7 +1260,7 @@ static void cliHandleUpdate(SCliMsg* pMsg, SCliThrd* pThrd) { destroyCmsg(pMsg); } -SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { +SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore, char* addr) { STransConnCtx* pCtx = pMsg->ctx; SCliConn* conn = NULL; @@ -1250,7 +1274,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { } else { conn = exh->handle; if (conn == NULL) { - conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet)); + conn = getConnFromPool(pThrd->pool, addr); if (conn != NULL) specifyConnRef(conn, true, refId); } transReleaseExHandle(transGetRefMgt(), refId); @@ -1258,7 +1282,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { return conn; }; - conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet)); + conn = getConnFromPool(pThrd->pool, addr); if (conn != NULL) { tTrace("%s conn %p get from conn pool:%p", CONN_GET_INST_LABEL(conn), conn, pThrd->pool); } else { @@ -1316,20 +1340,30 @@ static FORCE_INLINE void cliUpdateFqdnCache(SHashObj* cache, char* fqdn) { return; } -static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port) { +static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* addr) { STrans* pTransInst = pThrd->pTransInst; - // STransConnCtx* pCtx = pMsg->ctx; - // char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); - // int32_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + SMsgList** list = taosHashGet(pThrd->connLimitCache, addr, strlen(addr)); + if (list == NULL || *list == NULL) { + return 0; + } - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, ip, port); + if ((*list)->numOfConn >= pTransInst->connLimitNum) { + return -1; + } + return 0; +} - int32_t* val = taosHashGet(pThrd->connLimitCache, key, strlen(key)); - if (val == NULL) return 0; +static int32_t cliPreCheckSessionLimitForMsg(SCliThrd* pThrd, char* addr, SCliMsg* pMsg) { + STrans* pTransInst = pThrd->pTransInst; + + SMsgList** list = taosHashGet(pThrd->connLimitCache, addr, strlen(addr)); + if (list == NULL || *list == NULL) { + return 0; + } - if (*val >= pTransInst->connLimitNum) { + if ((*list)->numOfConn >= pTransInst->connLimitNum) { + QUEUE_PUSH(&(*list)->msgQ, &pMsg->q); return -1; } return 0; @@ -1337,36 +1371,22 @@ static int32_t cliPreCheckSessionLimit(SCliThrd* pThrd, char* ip, uint16_t port) void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { STrans* pTransInst = pThrd->pTransInst; STransConnCtx* pCtx = pMsg->ctx; + STraceId* trace = &pMsg->msg.info.traceId; cliMayCvtFqdnToIp(&pCtx->epSet, &pThrd->cvtAddr); - STraceId* trace = &pMsg->msg.info.traceId; - char* ip = EPSET_GET_INUSE_IP(&pCtx->epSet); - uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); - if (!EPSET_IS_VALID(&pCtx->epSet)) { tGError("%s, msg %s sent with invalid epset", pTransInst->label, TMSG_INFO(pMsg->msg.msgType)); destroyCmsg(pMsg); return; } - if (REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { - char key[TSDB_FQDN_LEN + 64] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, ip, port); - - SFailFastItem* item = taosHashGet(pThrd->failFastCache, key, strlen(key)); - if (item != NULL) { - int32_t elapse = (int32_t)(taosGetTimestampMs() - item->timestamp); - if (item->count >= pTransInst->failFastThreshold && (elapse >= 0 && elapse <= pTransInst->failFastInterval)) { - tGTrace("%s, msg %s cancel to send, reason: failed to connect %s:%d: count: %d, at %d", pTransInst->label, - TMSG_INFO(pMsg->msg.msgType), ip, port, item->count, elapse); - destroyCmsg(pMsg); - return; - } - } - } + char* fqdn = EPSET_GET_INUSE_IP(&pCtx->epSet); + uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); + char addr[TSDB_FQDN_LEN + 64] = {0}; + CONN_CONSTRUCT_HASH_KEY(addr, fqdn, port); bool ignore = false; - SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); + SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore, addr); if (ignore == true) { // persist conn already release by server STransMsg resp; @@ -1377,11 +1397,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { destroyCmsg(pMsg); return; } - - if (conn == NULL && REQUEST_NO_RESP(&pMsg->msg) && 0 != cliPreCheckSessionLimit(pThrd, ip, port)) { - tGTrace("%s, msg %s cancel to send, reason: %s", pTransInst->label, TMSG_INFO(pMsg->msg.msgType), - tstrerror(TSDB_CODE_RPC_MAX_SESSIONS)); - destroyCmsg(pMsg); + if (conn == NULL && cliPreCheckSessionLimitForMsg(pThrd, addr, pMsg) != 0) { return; } @@ -1398,13 +1414,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { transCtxMerge(&conn->ctx, &pCtx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); - char key[TSDB_FQDN_LEN + 64] = {0}; - char* fqdn = EPSET_GET_INUSE_IP(&pCtx->epSet); - uint16_t port = EPSET_GET_INUSE_PORT(&pCtx->epSet); - CONN_CONSTRUCT_HASH_KEY(key, fqdn, port); - - conn->ip = strdup(key); - + conn->ip = strdup(addr); uint32_t ipaddr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, fqdn); if (ipaddr == 0xffffffff) { uv_timer_stop(conn->timer); @@ -1453,6 +1463,15 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { cliHandleFastFail(conn, ret); return; } + + SMsgList** list = taosHashGet(pThrd->connLimitCache, conn->ip, strlen(conn->ip)); + if (list == NULL || *list == NULL) { + SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); + nList->numOfConn++; + QUEUE_INIT(&nList->msgQ); + taosHashPut(pThrd->connLimitCache, conn->ip, strlen(conn->ip), &nList, sizeof(void*)); + } + uv_timer_start(conn->timer, cliConnTimeout, TRANS_CONN_TIMEOUT, 0); } tGTrace("%s conn %p ready", pTransInst->label, conn); @@ -1833,8 +1852,7 @@ static SCliThrd* createThrdObj(void* trans) { pThrd->destroyAhandleFp = pTransInst->destroyFp; pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->failFastCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, - pTransInst->connLimitLock == 0 ? HASH_NO_LOCK : HASH_ENTRY_LOCK); + pThrd->connLimitCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->batchCache = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); @@ -1865,7 +1883,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { taosMemoryFree(pThrd->loop); taosHashCleanup(pThrd->fqdn2ipCache); taosHashCleanup(pThrd->failFastCache); - taosHashCleanup(pThrd->connLimitCache); void** pIter = taosHashIterate(pThrd->batchCache, NULL); while (pIter != NULL) { @@ -1884,6 +1901,23 @@ static void destroyThrdObj(SCliThrd* pThrd) { pIter = (void**)taosHashIterate(pThrd->batchCache, pIter); } taosHashCleanup(pThrd->batchCache); + + pIter = taosHashIterate(pThrd->connLimitCache, NULL); + while (pIter != NULL) { + SMsgList* list = (SMsgList*)(*pIter); + while (!QUEUE_IS_EMPTY(&list->msgQ)) { + queue* h = QUEUE_HEAD(&list->msgQ); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + destroyCmsg(pMsg); + } + taosMemoryFree(list); + + pIter = (void**)taosHashIterate(pThrd->connLimitCache, pIter); + } + taosHashCleanup(pThrd->connLimitCache); + taosMemoryFree(pThrd); }