diff --git a/source/libs/index/inc/indexUtil.h b/source/libs/index/inc/indexUtil.h index 814d61afd737a2e455ff4670ee9192fe54ed3ec1..f1676ed411a5e2074667816d1746dc607dc0f44d 100644 --- a/source/libs/index/inc/indexUtil.h +++ b/source/libs/index/inc/indexUtil.h @@ -68,7 +68,7 @@ extern "C" { */ void iIntersection(SArray *interResults, SArray *finalResult); -/* multi sorted result intersection +/* multi sorted result union * input: [1, 2, 4, 5] * [2, 3, 4, 5] * [1, 4, 5] @@ -76,7 +76,7 @@ void iIntersection(SArray *interResults, SArray *finalResult); */ void iUnion(SArray *interResults, SArray *finalResult); -/* sorted array +/* see example * total: [1, 2, 4, 5, 7, 8] * except: [4, 5] * return: [1, 2, 7, 8] saved in total diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 80b5d88a9040623b2e86eea2dce2445d5e60f0fc..570eb484d8a019f0f1b107bfba02fc222b6509a9 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -891,7 +891,6 @@ static void doDelayTask(void* param) { SCliMsg* pMsg = arg->param1; SCliThrdObj* pThrd = arg->param2; - cliHandleReq(pMsg, pThrd); taosMemoryFree(arg); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 01a20a466ac7419c6b803fa4b171ecd9f5e8daea..1bd7d0857e5458d2c3b70d1ef450a7826de38485 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -425,10 +425,19 @@ void transDQDestroy(SDelayQueue* queue) { } int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs) { + uint64_t now = taosGetTimestampMs(); SDelayTask* task = taosMemoryCalloc(1, sizeof(SDelayTask)); task->func = func; task->arg = arg; - task->execTime = taosGetTimestampMs() + timeoutMs; + task->execTime = now + timeoutMs; + + HeapNode* minNode = heapMin(queue->heap); + if (minNode) { + SDelayTask* minTask = container_of(minNode, SDelayTask, node); + if (minTask->execTime < task->execTime) { + timeoutMs = minTask->execTime <= now ? 0 : now - minTask->execTime; + } + } tTrace("timer %p put task into queue, timeoutMs: %" PRIu64 "", queue->timer, timeoutMs); heapInsert(queue->heap, &task->node); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 916bcb886258b550d4270166f0538baa52b7ba70..7378ca324139d0854295324a623aa2342e809a9e 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -810,12 +810,14 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrdObj* thrd = (SWorkThrdObj*)taosMemoryCalloc(1, sizeof(SWorkThrdObj)); + thrd->pTransInst = shandle; thrd->quit = false; srv->pThreadObj[i] = thrd; thrd->pTransInst = shandle; srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); + uv_os_sock_t fds[2]; if (uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { goto End;