diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 9dd1a745d38c57bdcd931a2ebb38be5507e3a6f2..3fa6344009b50ccb89f27b6d56ce9b24d282ed16 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -229,8 +229,8 @@ typedef struct { int8_t stop; } SAsyncPool; -SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); -void transDestroyAsyncPool(SAsyncPool* pool); +SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); +void transAsyncPoolDestroy(SAsyncPool* pool); int transAsyncSend(SAsyncPool* pool, queue* mq); bool transAsyncPoolIsEmpty(SAsyncPool* pool); @@ -322,7 +322,7 @@ typedef struct STransReq { } STransReq; void transReqQueueInit(queue* q); -void* transReqQueuePushReq(queue* q); +void* transReqQueuePush(queue* q); void* transReqQueueRemove(void* arg); void transReqQueueClear(queue* q); @@ -393,9 +393,9 @@ typedef struct SDelayQueue { uv_loop_t* loop; } SDelayQueue; -int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); -void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)); -int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); +int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); +void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)); +SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); bool transEpSetIsEqual(SEpSet* a, SEpSet* b); /* diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f94a7f3c37c221a41d947e8443dcd7692b875478..9de8c273d95d29fb66b467a1013052953d9d51ce 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -26,7 +26,7 @@ typedef struct SCliConn { SConnBuffer readBuf; STransQueue cliMsgs; - queue conn; + queue q; uint64_t expireTime; STransCtx ctx; @@ -451,7 +451,7 @@ void cliTimeoutCb(uv_timer_t* handle) { while (p != NULL) { while (!QUEUE_IS_EMPTY(&p->conn)) { queue* h = QUEUE_HEAD(&p->conn); - SCliConn* c = QUEUE_DATA(h, SCliConn, conn); + SCliConn* c = QUEUE_DATA(h, SCliConn, q); if (c->expireTime < currentTime) { QUEUE_REMOVE(h); transUnrefCliHandle(c); @@ -475,7 +475,7 @@ void* destroyConnPool(void* pool) { while (connList != NULL) { while (!QUEUE_IS_EMPTY(&connList->conn)) { queue* h = QUEUE_HEAD(&connList->conn); - SCliConn* c = QUEUE_DATA(h, SCliConn, conn); + SCliConn* c = QUEUE_DATA(h, SCliConn, q); cliDestroyConn(c, true); } connList = taosHashIterate((SHashObj*)pool, connList); @@ -501,11 +501,11 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { return NULL; } queue* h = QUEUE_HEAD(&plist->conn); - SCliConn* conn = QUEUE_DATA(h, SCliConn, conn); + SCliConn* conn = QUEUE_DATA(h, SCliConn, q); conn->status = ConnNormal; - QUEUE_REMOVE(&conn->conn); - QUEUE_INIT(&conn->conn); - assert(h == &conn->conn); + QUEUE_REMOVE(&conn->q); + QUEUE_INIT(&conn->q); + assert(h == &conn->q); return conn; } static int32_t allocConnRef(SCliConn* conn, bool update) { @@ -560,8 +560,8 @@ static void addConnToPool(void* pool, SCliConn* conn) { SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); // list already create before assert(plist != NULL); - QUEUE_INIT(&conn->conn); - QUEUE_PUSH(&plist->conn, &conn->conn); + QUEUE_INIT(&conn->q); + QUEUE_PUSH(&plist->conn, &conn->q); assert(!QUEUE_IS_EMPTY(&plist->conn)); } static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { @@ -614,7 +614,7 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { transReqQueueInit(&conn->wreqQueue); transQueueInit(&conn->cliMsgs, NULL); - QUEUE_INIT(&conn->conn); + QUEUE_INIT(&conn->q); conn->hostThrd = pThrd; conn->status = ConnNormal; conn->broken = 0; @@ -626,8 +626,8 @@ static SCliConn* cliCreateConn(SCliThrd* pThrd) { } static void cliDestroyConn(SCliConn* conn, bool clear) { tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); - QUEUE_REMOVE(&conn->conn); - QUEUE_INIT(&conn->conn); + QUEUE_REMOVE(&conn->q); + QUEUE_INIT(&conn->q); transRemoveExHandle(transGetRefMgt(), conn->refId); conn->refId = -1; @@ -735,7 +735,7 @@ void cliSend(SCliConn* pConn) { CONN_SET_PERSIST_BY_APP(pConn); } - uv_write_t* req = transReqQueuePushReq(&pConn->wreqQueue); + uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); uv_write(req, (uv_stream_t*)pConn->stream, &wb, 1, cliSendCb); return; _RETURN: @@ -990,7 +990,7 @@ static SCliThrd* createThrdObj() { pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); uv_loop_init(pThrd->loop); - pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, cliAsyncCb); + pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, cliAsyncCb); uv_timer_init(pThrd->loop, &pThrd->timer); pThrd->timer.data = pThrd; @@ -1009,7 +1009,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { CLI_RELEASE_UV(pThrd->loop); taosThreadMutexDestroy(&pThrd->msgMtx); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg); - transDestroyAsyncPool(pThrd->asyncPool); + transAsyncPoolDestroy(pThrd->asyncPool); transDQDestroy(pThrd->delayQueue, destroyCmsg); taosMemoryFree(pThrd->loop); @@ -1054,6 +1054,12 @@ static void doDelayTask(void* param) { cliHandleReq(pMsg, pThrd); } +static void doCloseIdleConn(void* param) { + STaskArg* arg = param; + SCliConn* conn = arg->param1; + SCliThrd* pThrd = arg->param2; +} + static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { STransConnCtx* pCtx = pMsg->ctx; @@ -1075,7 +1081,7 @@ void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { } } -bool cliTryToExtractEpSet(STransMsg* pResp, SEpSet* dst) { +bool cliTryExtractEpSet(STransMsg* pResp, SEpSet* dst) { if ((pResp == NULL || pResp->info.hasEpSet == 0)) { return false; } @@ -1116,7 +1122,8 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { */ STransConnCtx* pCtx = pMsg->ctx; int32_t code = pResp->code; - bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false; + + bool retry = (pTransInst->retry != NULL && pTransInst->retry(code, pResp->msgType - 1)) ? true : false; if (retry) { pMsg->sent = 0; pCtx->retryCnt += 1; @@ -1125,6 +1132,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { if (pCtx->retryCnt < pCtx->retryLimit) { transUnrefCliHandle(pConn); EPSET_FORWARD_INUSE(&pCtx->epSet); + transFreeMsg(pResp->pCont); cliSchedMsgToNextNode(pMsg, pThrd); return -1; } @@ -1148,7 +1156,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { STraceId* trace = &pResp->info.traceId; - bool hasEpSet = cliTryToExtractEpSet(pResp, &pCtx->epSet); + bool hasEpSet = cliTryExtractEpSet(pResp, &pCtx->epSet); if (hasEpSet) { char tbuf[256] = {0}; EPSET_DEBUG_STR(&pCtx->epSet, tbuf); @@ -1336,19 +1344,18 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); - if (0 != transAsyncSend(pThrd->asyncPool, &cliMsg->q)) { - tsem_destroy(sem); - taosMemoryFree(sem); + int ret = transAsyncSend(pThrd->asyncPool, &cliMsg->q); + if (ret != 0) { destroyCmsg(cliMsg); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return -1; + goto _RETURN; } tsem_wait(sem); + +_RETURN: tsem_destroy(sem); taosMemoryFree(sem); - transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); - return 0; + return ret; } /* * diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index c3cba3118ce79c9896ddd98e05f8a295be8a592a..34849df2b277c0a770677b28a4bd6056666febfd 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -175,7 +175,7 @@ int transSetConnOption(uv_tcp_t* stream) { return ret; } -SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { +SAsyncPool* transAsyncPoolCreate(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool)); pool->nAsync = sz; pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync); @@ -194,7 +194,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) return pool; } -void transDestroyAsyncPool(SAsyncPool* pool) { +void transAsyncPoolDestroy(SAsyncPool* pool) { for (int i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); // uv_close((uv_handle_t*)async, NULL); @@ -205,6 +205,14 @@ void transDestroyAsyncPool(SAsyncPool* pool) { taosMemoryFree(pool->asyncs); taosMemoryFree(pool); } +bool transAsyncPoolIsEmpty(SAsyncPool* pool) { + for (int i = 0; i < pool->nAsync; i++) { + uv_async_t* async = &(pool->asyncs[i]); + SAsyncItem* item = async->data; + if (!QUEUE_IS_EMPTY(&item->qmsg)) return false; + } + return true; +} int transAsyncSend(SAsyncPool* pool, queue* q) { if (atomic_load_8(&pool->stop) == 1) { return -1; @@ -228,14 +236,6 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { } return uv_async_send(async); } -bool transAsyncPoolIsEmpty(SAsyncPool* pool) { - for (int i = 0; i < pool->nAsync; i++) { - uv_async_t* async = &(pool->asyncs[i]); - SAsyncItem* item = async->data; - if (!QUEUE_IS_EMPTY(&item->qmsg)) return false; - } - return true; -} void transCtxInit(STransCtx* ctx) { // init transCtx @@ -308,7 +308,7 @@ void transReqQueueInit(queue* q) { // init req queue QUEUE_INIT(q); } -void* transReqQueuePushReq(queue* q) { +void* transReqQueuePush(queue* q) { uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); STransReq* wreq = taosMemoryCalloc(1, sizeof(STransReq)); wreq->data = req; @@ -488,8 +488,25 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { heapDestroy(queue->heap); taosMemoryFree(queue); } +void transDQCancel(SDelayQueue* queue, SDelayTask* task) { + uv_timer_stop(queue->timer); + + if (heapSize(queue->heap) <= 0) return; + heapRemove(queue->heap, &task->node); -int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { + if (heapSize(queue->heap) != 0) { + HeapNode* minNode = heapMin(queue->heap); + if (minNode != NULL) return; + + uint64_t now = taosGetTimestampMs(); + SDelayTask* task = container_of(minNode, SDelayTask, node); + uint64_t timeout = now > task->execTime ? now - task->execTime : 0; + + uv_timer_start(queue->timer, transDQTimeout, timeout, 0); + } +} + +SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { uint64_t now = taosGetTimestampMs(); SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask)); task->func = func; @@ -507,7 +524,7 @@ int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_ tTrace("timer %p put task into delay queue, timeoutMs:%" PRIu64, queue->timer, timeoutMs); heapInsert(queue->heap, &task->node); uv_timer_start(queue->timer, transDQTimeout, timeoutMs, 0); - return 0; + return task; } void transPrintEpSet(SEpSet* pEpSet) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 7b9402f954484a6383aa82cb04d4cf8948f4fd2a..3fb947bdba975055e0fad34c8e92d1683fb94b10 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -434,7 +434,7 @@ static void uvStartSendRespInternal(SSvrMsg* smsg) { uvPrepareSendData(smsg, &wb); transRefSrvHandle(pConn); - uv_write_t* req = transReqQueuePushReq(&pConn->wreqQueue); + uv_write_t* req = transReqQueuePush(&pConn->wreqQueue); uv_write(req, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb); } static void uvStartSendResp(SSvrMsg* smsg) { @@ -697,7 +697,7 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { // conn set QUEUE_INIT(&pThrd->conn); - pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 1, pThrd, uvWorkerAsyncCb); + pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 1, pThrd, uvWorkerAsyncCb); uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); // uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); return true; @@ -976,7 +976,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) { taosThreadJoin(pThrd->thread, NULL); SRV_RELEASE_UV(pThrd->loop); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg); - transDestroyAsyncPool(pThrd->asyncPool); + transAsyncPoolDestroy(pThrd->asyncPool); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); }