diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 1e2a53f598cb75a7ff3936b1a895b8d1e78e3e52..3d5b04ddf46194947bf8ef0fe3248e7a38bee4d1 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -278,9 +278,15 @@ typedef struct SNodeList { typedef struct SNodeAllocator SNodeAllocator; int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator); -void nodesDestroyNodeAllocator(SNodeAllocator* pAllocator); +void nodesDestroyNodeAllocator(void* pAllocator); void nodesResetThreadLevelAllocator(SNodeAllocator* pAllocator); +int32_t nodesAllocatorInit(); +int32_t nodesCreateAllocator(int32_t chunkSize, int64_t* pRefId); +void nodesDestroyAllocator(int64_t refId); +void nodesResetAllocator(int64_t refId); +int64_t nodesIncAllocatorRefCount(int64_t refId); + SNode* nodesMakeNode(ENodeType type); void nodesDestroyNode(SNode* pNode); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index e6973cd390c10ff524f70549d161090582ee56ab..738d057e6ac531e726cd069eaf0de35bdc15c365 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -67,6 +67,7 @@ typedef struct SSchedulerReq { SRequestConnInfo *pConn; SArray *pNodeList; SQueryPlan *pDag; + int64_t allocatorRefId; const char *sql; int64_t startTs; schedulerExecFp execFp; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index f042c3ad7c19511392358268fefe0af9a76ff647..eca1a0ebbea8ab2d77949c4f2b660cdb9be049d1 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -250,7 +250,8 @@ typedef struct SRequestObj { bool inRetry; uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t retry; - SNodeAllocator* pNodeAllocator; + // SNodeAllocator* pNodeAllocator; + int64_t allocatorRefId; } SRequestObj; typedef struct SSyncQueryParam { diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 8fded0347219e7630c85b39c593318382afac466..e95a2d28711c004a272225a146f662c5aa8f2d30 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -349,7 +349,7 @@ void doDestroyRequest(void *p) { taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->dbList); taosArrayDestroy(pRequest->targetTableList); - nodesDestroyNodeAllocator(pRequest->pNodeAllocator); + nodesDestroyAllocator(pRequest->allocatorRefId); destroyQueryExecRes(&pRequest->body.resInfo.execRes); @@ -412,6 +412,7 @@ void taos_init_imp(void) { initTaskQueue(); fmFuncMgtInit(); + nodesAllocatorInit(); clientConnRefPool = taosOpenRef(200, destroyTscObj); clientReqRefPool = taosOpenRef(40960, doDestroyRequest); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index bc1cdc247e5facfb35f89ee4b2e5608fe02078e2..75e0966243d46b4cbe9adb79712491c801def9f1 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -194,8 +194,9 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, return TSDB_CODE_TSC_OUT_OF_MEMORY; } + (*pRequest)->allocatorRefId = -1; if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) { - if (TSDB_CODE_SUCCESS != nodesCreateNodeAllocator(tsQueryNodeChunkSize, &((*pRequest)->pNodeAllocator))) { + if (TSDB_CODE_SUCCESS != nodesCreateAllocator(tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) { tscError("%d failed to create node allocator, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql); @@ -1058,6 +1059,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM .pConn = &conn, .pNodeList = pNodeList, .pDag = pDag, + .allocatorRefId = pRequest->allocatorRefId, .sql = pRequest->sqlstr, .startTs = pRequest->metric.start, .execFp = schedulerExecCb, diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index e19d88fcf31530c2458646b4e5d69aa8e60ca8e2..c3f8ca32b897eb6a8e0d692c61996e4f49ed06ad 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -700,7 +700,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { pRequest->metric.ctgEnd = taosGetTimestampUs(); - nodesResetThreadLevelAllocator(pRequest->pNodeAllocator); + nodesResetAllocator(pRequest->allocatorRefId); if (code == TSDB_CODE_SUCCESS) { code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); @@ -728,11 +728,11 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { pRequest->requestId); launchAsyncQuery(pRequest, pQuery, pResultMeta); qDestroyQuery(pQuery); - nodesResetThreadLevelAllocator(NULL); + nodesResetAllocator(-1); } else { destorySqlParseWrapper(pWrapper); qDestroyQuery(pQuery); - nodesResetThreadLevelAllocator(NULL); + nodesResetAllocator(-1); if (NEED_CLIENT_HANDLE_ERROR(code)) { tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); @@ -805,7 +805,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { } SQuery *pQuery = NULL; - nodesResetThreadLevelAllocator(pRequest->pNodeAllocator); + nodesResetAllocator(pRequest->allocatorRefId); pRequest->metric.syntaxStart = taosGetTimestampUs(); @@ -849,7 +849,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { &pRequest->body.queryJob); pCxt = NULL; if (code == TSDB_CODE_SUCCESS) { - nodesResetThreadLevelAllocator(NULL); + nodesResetAllocator(-1); return; } @@ -857,7 +857,7 @@ _error: tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); taosMemoryFree(pCxt); - nodesResetThreadLevelAllocator(NULL); + nodesResetAllocator(-1); terrno = code; pRequest->code = code; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 8b9b723a2ad45249cabbb0e79e7281ca7feb1d73..b7caead3e5871680a127d667700d30aca258da01 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -21,6 +21,7 @@ #include "taoserror.h" #include "tdatablock.h" #include "thash.h" +#include "tref.h" typedef struct SNodeMemChunk { int32_t availableSize; @@ -30,6 +31,7 @@ typedef struct SNodeMemChunk { } SNodeMemChunk; typedef struct SNodeAllocator { + int64_t self; int32_t chunkSize; int32_t chunkNum; SNodeMemChunk* pCurrChunk; @@ -37,6 +39,22 @@ typedef struct SNodeAllocator { } SNodeAllocator; static threadlocal SNodeAllocator* pNodeAllocator; +static int32_t allocatorReqRefPool = -1; + +int32_t nodesAllocatorInit() { + if (allocatorReqRefPool >= 0) { + nodesWarn("nodes already initialized"); + return TSDB_CODE_SUCCESS; + } + + allocatorReqRefPool = taosOpenRef(40960, nodesDestroyNodeAllocator); + if (allocatorReqRefPool < 0) { + nodesError("init nodes failed"); + return TSDB_CODE_OUT_OF_MEMORY; + } + + return TSDB_CODE_SUCCESS; +} static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) { SNodeMemChunk* pNewChunk = taosMemoryCalloc(1, sizeof(SNodeMemChunk) + pAllocator->chunkSize); @@ -103,11 +121,13 @@ int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator) return TSDB_CODE_SUCCESS; } -void nodesDestroyNodeAllocator(SNodeAllocator* pAllocator) { - if (NULL == pAllocator) { +void nodesDestroyNodeAllocator(void* p) { + if (NULL == p) { return; } + SNodeAllocator* pAllocator = p; + nodesDebug("alloc chunkNum: %d, chunkTotakSize: %d", pAllocator->chunkNum, pAllocator->chunkNum * pAllocator->chunkSize); @@ -122,6 +142,40 @@ void nodesDestroyNodeAllocator(SNodeAllocator* pAllocator) { void nodesResetThreadLevelAllocator(SNodeAllocator* pAllocator) { pNodeAllocator = pAllocator; } +int32_t nodesCreateAllocator(int32_t chunkSize, int64_t* pRefId) { + SNodeAllocator* pAllocator = NULL; + int32_t code = nodesCreateNodeAllocator(chunkSize, &pAllocator); + if (TSDB_CODE_SUCCESS == code) { + pAllocator->self = taosAddRef(allocatorReqRefPool, pAllocator); + *pRefId = pAllocator->self; + } + return code; +} + +void nodesDestroyAllocator(int64_t refId) { + if (refId < 0) { + return; + } + taosReleaseRef(allocatorReqRefPool, refId); +} + +void nodesResetAllocator(int64_t refId) { + if (refId < 0) { + pNodeAllocator = NULL; + } else { + pNodeAllocator = taosAcquireRef(allocatorReqRefPool, refId); + taosReleaseRef(allocatorReqRefPool, refId); + } +} + +int64_t nodesIncAllocatorRefCount(int64_t refId) { + if (refId < 0) { + return -1; + } + SNodeAllocator* pAllocator = taosAcquireRef(allocatorReqRefPool, refId); + return pAllocator->self; +} + static SNode* makeNode(ENodeType type, int32_t size) { SNode* p = nodesCalloc(1, size); if (NULL == p) { diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 7fea2867323f9d5f119740d94a80da4f1a0b5b51..a62531a8757f272f2ada32be4966e349ec46e769 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -254,7 +254,8 @@ typedef struct SSchJob { SRequestConnInfo conn; SArray *nodeList; // qnode/vnode list, SArray SArray *levels; // starting from 0. SArray - SQueryPlan *pDag; + SQueryPlan *pDag; + int64_t allocatorRefId; SArray *dataSrcTasks; // SArray int32_t levelIdx; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 98501427ab7b006daa78bc5d1c6c7c8d377572a0..345f4680b0c6fc0fd7e4783165e614d1dd5e5ebb 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -673,6 +673,7 @@ void schFreeJobImpl(void *job) { destroyQueryExecRes(&pJob->execRes); qDestroyQueryPlan(pJob->pDag); + nodesDestroyAllocator(pJob->allocatorRefId); taosMemoryFreeClear(pJob->userRes.execRes); taosMemoryFreeClear(pJob->fetchRes); @@ -724,6 +725,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { pJob->sql = strdup(pReq->sql); } pJob->pDag = pReq->pDag; + pJob->allocatorRefId = nodesIncAllocatorRefCount(pReq->allocatorRefId); pJob->chkKillFp = pReq->chkKillFp; pJob->chkKillParam = pReq->chkKillParam; pJob->userRes.execFp = pReq->execFp;