diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 2de4ffdc17347f2e3afb2793a95e32b4cea966e6..234937ea7ea0fad7accf03e0e5d1ea4b60e674ff 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -95,6 +95,8 @@ extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in extern int32_t tsQueryPolicy; extern int32_t tsQuerySmaOptimize; extern bool tsQueryPlannerTrace; +extern int32_t tsQueryNodeChunkSize; +extern bool tsQueryUseNodeAllocator; // client extern int32_t tsMinSlidingTime; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index b8fa9580e70c1c7aa17a1402ce6c8113a7f8e094..f042c3ad7c19511392358268fefe0af9a76ff647 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -250,6 +250,7 @@ typedef struct SRequestObj { bool inRetry; uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t retry; + SNodeAllocator* pNodeAllocator; } SRequestObj; typedef struct SSyncQueryParam { diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index b739aedca0ff7c8bd8e408e2e456aa7414f1ac30..8fded0347219e7630c85b39c593318382afac466 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -349,6 +349,7 @@ void doDestroyRequest(void *p) { taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->dbList); taosArrayDestroy(pRequest->targetTableList); + nodesDestroyNodeAllocator(pRequest->pNodeAllocator); destroyQueryExecRes(&pRequest->body.resInfo.execRes); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 0e1d82b273523a02a40ee0605f7ab259013bfe2a..bc1cdc247e5facfb35f89ee4b2e5608fe02078e2 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -194,6 +194,17 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, return TSDB_CODE_TSC_OUT_OF_MEMORY; } + if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) { + if (TSDB_CODE_SUCCESS != nodesCreateNodeAllocator(tsQueryNodeChunkSize, &((*pRequest)->pNodeAllocator))) { + tscError("%d failed to create node allocator, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self, + (*pRequest)->requestId, pTscObj->id, sql); + + destroyRequest(*pRequest); + *pRequest = NULL; + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + } + tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); return TSDB_CODE_SUCCESS; } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 73636e7372b97d9cc13926ac9cafccef315e3d70..e19d88fcf31530c2458646b4e5d69aa8e60ca8e2 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -700,6 +700,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { pRequest->metric.ctgEnd = taosGetTimestampUs(); + nodesResetThreadLevelAllocator(pRequest->pNodeAllocator); + if (code == TSDB_CODE_SUCCESS) { code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); pRequest->stableQuery = pQuery->stableQuery; @@ -726,9 +728,11 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { pRequest->requestId); launchAsyncQuery(pRequest, pQuery, pResultMeta); qDestroyQuery(pQuery); + nodesResetThreadLevelAllocator(NULL); } else { destorySqlParseWrapper(pWrapper); qDestroyQuery(pQuery); + nodesResetThreadLevelAllocator(NULL); if (NEED_CLIENT_HANDLE_ERROR(code)) { tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); @@ -801,6 +805,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { } SQuery *pQuery = NULL; + nodesResetThreadLevelAllocator(pRequest->pNodeAllocator); pRequest->metric.syntaxStart = taosGetTimestampUs(); @@ -844,6 +849,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { &pRequest->body.queryJob); pCxt = NULL; if (code == TSDB_CODE_SUCCESS) { + nodesResetThreadLevelAllocator(NULL); return; } @@ -851,6 +857,7 @@ _error: tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->requestId); taosMemoryFree(pCxt); + nodesResetThreadLevelAllocator(NULL); terrno = code; pRequest->code = code; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ddda8f8c9aafcdf3558525bd51666d9975f555c4..e119dae7432054de0c9da545e6d241632754099e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -92,6 +92,8 @@ bool tsSmlDataFormat = int32_t tsQueryPolicy = 1; int32_t tsQuerySmaOptimize = 0; bool tsQueryPlannerTrace = false; +int32_t tsQueryNodeChunkSize = 32 * 1024; +bool tsQueryUseNodeAllocator = true; /* * denote if the server needs to compress response message at the application layer to client, including query rsp, @@ -284,6 +286,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1; if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1; if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1; + if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, true) != 0) return -1; + if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, true) != 0) return -1; if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1; if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1; if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; @@ -385,9 +389,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4); if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1; -// tsNumOfQnodeFetchThreads = tsNumOfCores / 2; -// tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); -// if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; + // tsNumOfQnodeFetchThreads = tsNumOfCores / 2; + // tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); + // if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; tsNumOfSnodeSharedThreads = tsNumOfCores / 4; tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4); @@ -527,15 +531,15 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { pItem->stype = stype; } -/* - pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads"); - if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { - tsNumOfQnodeFetchThreads = numOfCores / 2; - tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); - pItem->i32 = tsNumOfQnodeFetchThreads; - pItem->stype = stype; - } -*/ + /* + pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads"); + if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { + tsNumOfQnodeFetchThreads = numOfCores / 2; + tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); + pItem->i32 = tsNumOfQnodeFetchThreads; + pItem->stype = stype; + } + */ pItem = cfgGetItem(tsCfg, "numOfSnodeSharedThreads"); if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { @@ -643,6 +647,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32; tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32; tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval; + tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32; + tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval; return 0; } @@ -693,7 +699,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; -// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; + // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; @@ -941,10 +947,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; } else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) { tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; -/* - } else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) { - tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; -*/ + /* + } else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) { + tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; + */ } else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) { tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; } else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) { @@ -976,6 +982,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { qDebugFlag = cfgGetItem(pCfg, "qDebugFlag")->i32; } else if (strcasecmp("queryPlannerTrace", name) == 0) { tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval; + } else if (strcasecmp("queryNodeChunkSize", name) == 0) { + tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32; + } else if (strcasecmp("queryUseNodeAllocator", name) == 0) { + tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval; } break; } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index be94495856a32c34cf272688b77e207c025be094..8b9b723a2ad45249cabbb0e79e7281ca7feb1d73 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -58,9 +58,9 @@ static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) { return pNewChunk; } -static void* nodesCalloc(int32_t num, int32_t size) { +static void* nodesCallocImpl(int32_t size) { if (NULL == pNodeAllocator) { - return taosMemoryCalloc(num, size); + return taosMemoryCalloc(1, size); } if (pNodeAllocator->pCurrChunk->usedSize + size > pNodeAllocator->pCurrChunk->availableSize) { @@ -73,9 +73,19 @@ static void* nodesCalloc(int32_t num, int32_t size) { return p; } +static void* nodesCalloc(int32_t num, int32_t size) { + void* p = nodesCallocImpl(num * size + 1); + if (NULL == p) { + return NULL; + } + *(char*)p = (NULL != pNodeAllocator) ? 1 : 0; + return (char*)p + 1; +} + static void nodesFree(void* p) { - if (NULL == pNodeAllocator) { - taosMemoryFree(p); + char* ptr = (char*)p - 1; + if (0 == *ptr) { + taosMemoryFree(ptr); } return; }