diff --git a/source/libs/index/src/indexFilter.c b/source/libs/index/src/indexFilter.c index 7fc41b8dff3a6344186ce0bf82c83762b15d8a81..e1c8ac0204f77f6e4756cd05fba36141b911290f 100644 --- a/source/libs/index/src/indexFilter.c +++ b/source/libs/index/src/indexFilter.c @@ -579,11 +579,13 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou if (ctx->noExec == false) { for (int32_t m = 0; m < node->pParameterList->length; m++) { - // add impl later if (node->condType == LOGIC_COND_TYPE_AND) { taosArrayAddAll(output->result, params[m].result); + // taosArrayDestroy(params[m].result); + // params[m].result = NULL; } else if (node->condType == LOGIC_COND_TYPE_OR) { taosArrayAddAll(output->result, params[m].result); + // params[m].result = NULL; } else if (node->condType == LOGIC_COND_TYPE_NOT) { // taosArrayAddAll(output->result, params[m].result); } @@ -593,6 +595,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou } else { for (int32_t m = 0; m < node->pParameterList->length; m++) { output->status = sifMergeCond(node->condType, output->status, params[m].status); + taosArrayDestroy(params[m].result); + params[m].result = NULL; } } _return: @@ -607,6 +611,7 @@ static EDealRes sifWalkFunction(SNode *pNode, void *context) { SIFCtx *ctx = context; ctx->code = sifExecFunction(node, ctx, &output); if (ctx->code != TSDB_CODE_SUCCESS) { + sifFreeParam(&output); return DEAL_RES_ERROR; } @@ -624,6 +629,7 @@ static EDealRes sifWalkLogic(SNode *pNode, void *context) { SIFCtx *ctx = context; ctx->code = sifExecLogic(node, ctx, &output); if (ctx->code) { + sifFreeParam(&output); return DEAL_RES_ERROR; } @@ -640,6 +646,7 @@ static EDealRes sifWalkOper(SNode *pNode, void *context) { SIFCtx *ctx = context; ctx->code = sifExecOper(node, ctx, &output); if (ctx->code) { + sifFreeParam(&output); return DEAL_RES_ERROR; } if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { @@ -698,7 +705,11 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) { } nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx); - SIF_ERR_RET(ctx.code); + + if (ctx.code != 0) { + sifFreeRes(ctx.pRes); + return ctx.code; + } if (pDst) { SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES); @@ -714,8 +725,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) { taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); } sifFreeRes(ctx.pRes); - - SIF_RET(code); + return code; } static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { @@ -732,8 +742,10 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { } nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx); - - SIF_ERR_RET(ctx.code); + if (ctx.code != 0) { + sifFreeRes(ctx.pRes); + return ctx.code; + } SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES); if (res == NULL) { @@ -745,8 +757,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { sifFreeParam(res); taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); taosHashCleanup(ctx.pRes); - - SIF_RET(code); + return code; } int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SIdxFltStatus *status) { @@ -760,7 +771,11 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SArray *output = taosArrayInit(8, sizeof(uint64_t)); SIFParam param = {.arg = *metaArg, .result = output}; - SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, ¶m)); + int32_t code = sifCalculate((SNode *)pFilterNode, ¶m); + if (code != 0) { + sifFreeParam(¶m); + return code; + } taosArrayAddAll(result, param.result); sifFreeParam(¶m); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 24a33d96d3c79f4cdb91582e5b13e391f77ec04e..efbe110f6f224b2740281dadf420b1e551fb4c2c 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1,5 +1,4 @@ /** Copyright (c) 2019 TAOS Data, Inc. - * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 @@ -16,6 +15,10 @@ #ifdef USE_UV #include "transComm.h" +typedef struct SConnList { + queue conn; +} SConnList; + typedef struct SCliConn { T_REF_DECLARE() uv_connect_t connReq; @@ -26,7 +29,9 @@ typedef struct SCliConn { SConnBuffer readBuf; STransQueue cliMsgs; - queue q; + + queue q; + SConnList* list; STransCtx ctx; bool broken; // link broken or not @@ -56,13 +61,14 @@ typedef struct SCliMsg { } SCliMsg; typedef struct SCliThrd { - TdThread thread; // tid - int64_t pid; // pid - uv_loop_t* loop; - SAsyncPool* asyncPool; - uv_idle_t* idle; - uv_timer_t timer; - void* pool; // conn pool + TdThread thread; // tid + int64_t pid; // pid + uv_loop_t* loop; + SAsyncPool* asyncPool; + uv_idle_t* idle; + uv_prepare_t* prepare; + uv_timer_t timer; + void* pool; // conn pool // msg queue queue msg; @@ -86,10 +92,6 @@ typedef struct SCliObj { SCliThrd** pThreadObj; } SCliObj; -typedef struct SConnList { - queue conn; -} SConnList; - // conn pool // add expire timeout and capacity limit static void* createConnPool(int size); @@ -101,7 +103,7 @@ static void doCloseIdleConn(void* param); static int sockDebugInfo(struct sockaddr* sockname, char* dst) { struct sockaddr_in addr = *(struct sockaddr_in*)sockname; - char buf[20] = {0}; + char buf[16] = {0}; int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf)); sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port)); return r; @@ -118,6 +120,9 @@ static void cliSendCb(uv_write_t* req, int status); static void cliConnCb(uv_connect_t* req, int status); static void cliAsyncCb(uv_async_t* handle); static void cliIdleCb(uv_idle_t* handle); +static void cliPrepareCb(uv_prepare_t* handle); + +static int32_t allocConnRef(SCliConn* conn, bool update); static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); @@ -198,7 +203,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { pThrd = (SCliThrd*)(exh)->pThrd; \ } \ } while (0) -#define CONN_PERSIST_TIME(para) ((para) == 0 ? 3 * 1000 : (para)) +#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_SHOULD_RELEASE(conn, head) \ @@ -499,9 +504,8 @@ void* destroyConnPool(void* pool) { } static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { - char key[128] = {0}; + char key[32] = {0}; CONN_CONSTRUCT_HASH_KEY(key, ip, port); - SHashObj* pPool = pool; SConnList* plist = taosHashGet(pPool, key, strlen(key)); if (plist == NULL) { @@ -519,13 +523,44 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { conn->status = ConnNormal; QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); - assert(h == &conn->q); transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); conn->task = NULL; return conn; } +static void addConnToPool(void* pool, SCliConn* conn) { + if (conn->status == ConnInPool) { + return; + } + SCliThrd* thrd = conn->hostThrd; + CONN_HANDLE_THREAD_QUIT(thrd); + + allocConnRef(conn, true); + + STrans* pTransInst = thrd->pTransInst; + cliReleaseUnfinishedMsg(conn); + transQueueClear(&conn->cliMsgs); + transCtxCleanup(&conn->ctx); + conn->status = ConnInPool; + + if (conn->list == NULL) { + char key[32] = {0}; + CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port); + tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); + conn->list = taosHashGet((SHashObj*)pool, key, strlen(key)); + } + assert(conn->list != NULL); + QUEUE_INIT(&conn->q); + QUEUE_PUSH(&conn->list->conn, &conn->q); + + assert(!QUEUE_IS_EMPTY(&conn->list->conn)); + + STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); + arg->param1 = conn; + arg->param2 = thrd; + conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); +} static int32_t allocConnRef(SCliConn* conn, bool update) { if (update) { transRemoveExHandle(transGetRefMgt(), conn->refId); @@ -556,38 +591,6 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { return 0; } -static void addConnToPool(void* pool, SCliConn* conn) { - if (conn->status == ConnInPool) { - return; - } - SCliThrd* thrd = conn->hostThrd; - CONN_HANDLE_THREAD_QUIT(thrd); - - allocConnRef(conn, true); - - STrans* pTransInst = thrd->pTransInst; - cliReleaseUnfinishedMsg(conn); - transQueueClear(&conn->cliMsgs); - transCtxCleanup(&conn->ctx); - conn->status = ConnInPool; - - char key[128] = {0}; - CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port); - tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap); - - SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); - // list already create before - assert(plist != NULL); - QUEUE_INIT(&conn->q); - QUEUE_PUSH(&plist->conn, &conn->q); - - assert(!QUEUE_IS_EMPTY(&plist->conn)); - - STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); - arg->param1 = conn; - arg->param2 = thrd; - conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); -} static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; @@ -965,6 +968,62 @@ static void cliAsyncCb(uv_async_t* handle) { static void cliIdleCb(uv_idle_t* handle) { SCliThrd* thrd = handle->data; tTrace("do idle work"); + + SAsyncPool* pool = thrd->asyncPool; + for (int i = 0; i < pool->nAsync; i++) { + uv_async_t* async = &(pool->asyncs[i]); + SAsyncItem* item = async->data; + + queue wq; + taosThreadMutexLock(&item->mtx); + QUEUE_MOVE(&item->qmsg, &wq); + taosThreadMutexUnlock(&item->mtx); + + int count = 0; + while (!QUEUE_IS_EMPTY(&wq)) { + queue* h = QUEUE_HEAD(&wq); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + if (pMsg == NULL) { + continue; + } + (*cliAsyncHandle[pMsg->type])(pMsg, thrd); + count++; + } + } + tTrace("prepare work end"); + if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd); +} +static void cliPrepareCb(uv_prepare_t* handle) { + SCliThrd* thrd = handle->data; + tTrace("prepare work start"); + + SAsyncPool* pool = thrd->asyncPool; + for (int i = 0; i < pool->nAsync; i++) { + uv_async_t* async = &(pool->asyncs[i]); + SAsyncItem* item = async->data; + + queue wq; + taosThreadMutexLock(&item->mtx); + QUEUE_MOVE(&item->qmsg, &wq); + taosThreadMutexUnlock(&item->mtx); + + int count = 0; + while (!QUEUE_IS_EMPTY(&wq)) { + queue* h = QUEUE_HEAD(&wq); + QUEUE_REMOVE(h); + + SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q); + if (pMsg == NULL) { + continue; + } + (*cliAsyncHandle[pMsg->type])(pMsg, thrd); + count++; + } + } + tTrace("prepare work end"); + if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd); } static void* cliWorkThread(void* arg) { @@ -1031,7 +1090,12 @@ static SCliThrd* createThrdObj() { // pThrd->idle = taosMemoryCalloc(1, sizeof(uv_idle_t)); // uv_idle_init(pThrd->loop, pThrd->idle); // pThrd->idle->data = pThrd; - // uv_idle_start(pThrd->idle, cliIdleCb); + // uv_idle_start(pThrd->idle, cliIdleCb); + + pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); + uv_prepare_init(pThrd->loop, pThrd->prepare); + pThrd->prepare->data = pThrd; + uv_prepare_start(pThrd->prepare, cliPrepareCb); pThrd->pool = createConnPool(4); transDQCreate(pThrd->loop, &pThrd->delayQueue); @@ -1056,6 +1120,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { transDQDestroy(pThrd->timeoutQueue, NULL); taosMemoryFree(pThrd->idle); + taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index c99effb26f4628986e9c3be266219ff371e8ff93..8cf525a506fe856876f2e8577e6f97dedbdb8d26 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -120,8 +120,9 @@ int transInitBuffer(SConnBuffer* buf) { buf->total = 0; return 0; } -int transDestroyBuffer(SConnBuffer* buf) { - taosMemoryFree(buf->buf); +int transDestroyBuffer(SConnBuffer* p) { + taosMemoryFree(p->buf); + p->buf = NULL; return 0; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index a97e0b53c1c5c4b443ddabe4061885bc64ee824d..4b579a1f9527bde98f9ac4d21723ed9d17e965d6 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -73,6 +73,7 @@ typedef struct SWorkThrd { uv_os_fd_t fd; uv_loop_t* loop; SAsyncPool* asyncPool; + uv_prepare_t* prepare; queue msg; TdThreadMutex msgMtx; @@ -112,6 +113,7 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) static void uvWorkerAsyncCb(uv_async_t* handle); static void uvAcceptAsyncCb(uv_async_t* handle); static void uvShutDownCb(uv_shutdown_t* req, int status); +static void uvPrepareCb(uv_prepare_t* handle); /* * time-consuming task throwed into BG work thread @@ -238,8 +240,6 @@ static void uvHandleReq(SSvrConn* pConn) { transMsg.msgType = pHead->msgType; transMsg.code = pHead->code; - // transClearBuffer(&pConn->readBuf); - pConn->inType = pHead->msgType; if (pConn->status == ConnNormal) { if (pHead->persist == 1) { @@ -546,6 +546,52 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) { uv_close((uv_handle_t*)req->handle, uvDestroyConn); taosMemoryFree(req); } +static void uvPrepareCb(uv_prepare_t* handle) { + // prepare callback + SWorkThrd* pThrd = handle->data; + SAsyncPool* pool = pThrd->asyncPool; + + for (int i = 0; i < pool->nAsync; i++) { + uv_async_t* async = &(pool->asyncs[i]); + SAsyncItem* item = async->data; + + queue wq; + taosThreadMutexLock(&item->mtx); + QUEUE_MOVE(&item->qmsg, &wq); + taosThreadMutexUnlock(&item->mtx); + + while (!QUEUE_IS_EMPTY(&wq)) { + queue* head = QUEUE_HEAD(&wq); + QUEUE_REMOVE(head); + + SSvrMsg* msg = QUEUE_DATA(head, SSvrMsg, q); + if (msg == NULL) { + tError("unexcept occurred, continue"); + continue; + } + // release handle to rpc init + if (msg->type == Quit) { + (*transAsyncHandle[msg->type])(msg, pThrd); + continue; + } else { + STransMsg transMsg = msg->msg; + + SExHandle* exh1 = transMsg.info.handle; + int64_t refId = transMsg.info.refId; + SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId); + if (exh2 == NULL || exh1 != exh2) { + tTrace("handle except msg %p, ignore it", exh1); + transReleaseExHandle(transGetRefMgt(), refId); + destroySmsg(msg); + continue; + } + msg->pConn = exh1->handle; + transReleaseExHandle(transGetRefMgt(), refId); + (*transAsyncHandle[msg->type])(msg, pThrd); + } + } + } +} static void uvWorkDoTask(uv_work_t* req) { // doing time-consumeing task @@ -695,13 +741,17 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { } uv_pipe_init(pThrd->loop, pThrd->pipe, 1); - // int r = uv_pipe_open(pThrd->pipe, pThrd->fd); pThrd->pipe->data = pThrd; QUEUE_INIT(&pThrd->msg); taosThreadMutexInit(&pThrd->msgMtx, NULL); + pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); + uv_prepare_init(pThrd->loop, pThrd->prepare); + uv_prepare_start(pThrd->prepare, uvPrepareCb); + pThrd->prepare->data = pThrd; + // conn set QUEUE_INIT(&pThrd->conn); @@ -986,6 +1036,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) { SRV_RELEASE_UV(pThrd->loop); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg); transAsyncPoolDestroy(pThrd->asyncPool); + taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); }