提交 46b0c2c9 编写于 作者: X Xiaoyu Wang

enh: added memory allocators for parser and planner

上级 8a010a58
...@@ -95,6 +95,8 @@ extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in ...@@ -95,6 +95,8 @@ extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in
extern int32_t tsQueryPolicy; extern int32_t tsQueryPolicy;
extern int32_t tsQuerySmaOptimize; extern int32_t tsQuerySmaOptimize;
extern bool tsQueryPlannerTrace; extern bool tsQueryPlannerTrace;
extern int32_t tsQueryNodeChunkSize;
extern bool tsQueryUseNodeAllocator;
// client // client
extern int32_t tsMinSlidingTime; extern int32_t tsMinSlidingTime;
......
...@@ -250,6 +250,7 @@ typedef struct SRequestObj { ...@@ -250,6 +250,7 @@ typedef struct SRequestObj {
bool inRetry; bool inRetry;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry; uint32_t retry;
SNodeAllocator* pNodeAllocator;
} SRequestObj; } SRequestObj;
typedef struct SSyncQueryParam { typedef struct SSyncQueryParam {
......
...@@ -349,6 +349,7 @@ void doDestroyRequest(void *p) { ...@@ -349,6 +349,7 @@ void doDestroyRequest(void *p) {
taosArrayDestroy(pRequest->tableList); taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->dbList); taosArrayDestroy(pRequest->dbList);
taosArrayDestroy(pRequest->targetTableList); taosArrayDestroy(pRequest->targetTableList);
nodesDestroyNodeAllocator(pRequest->pNodeAllocator);
destroyQueryExecRes(&pRequest->body.resInfo.execRes); destroyQueryExecRes(&pRequest->body.resInfo.execRes);
......
...@@ -194,6 +194,17 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, ...@@ -194,6 +194,17 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
if (TSDB_CODE_SUCCESS != nodesCreateNodeAllocator(tsQueryNodeChunkSize, &((*pRequest)->pNodeAllocator))) {
tscError("%d failed to create node allocator, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self,
(*pRequest)->requestId, pTscObj->id, sql);
destroyRequest(*pRequest);
*pRequest = NULL;
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
}
tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId); tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -700,6 +700,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { ...@@ -700,6 +700,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
pRequest->metric.ctgEnd = taosGetTimestampUs(); pRequest->metric.ctgEnd = taosGetTimestampUs();
nodesResetThreadLevelAllocator(pRequest->pNodeAllocator);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery); code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
pRequest->stableQuery = pQuery->stableQuery; pRequest->stableQuery = pQuery->stableQuery;
...@@ -726,9 +728,11 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { ...@@ -726,9 +728,11 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
pRequest->requestId); pRequest->requestId);
launchAsyncQuery(pRequest, pQuery, pResultMeta); launchAsyncQuery(pRequest, pQuery, pResultMeta);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
nodesResetThreadLevelAllocator(NULL);
} else { } else {
destorySqlParseWrapper(pWrapper); destorySqlParseWrapper(pWrapper);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
nodesResetThreadLevelAllocator(NULL);
if (NEED_CLIENT_HANDLE_ERROR(code)) { if (NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
...@@ -801,6 +805,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { ...@@ -801,6 +805,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
} }
SQuery *pQuery = NULL; SQuery *pQuery = NULL;
nodesResetThreadLevelAllocator(pRequest->pNodeAllocator);
pRequest->metric.syntaxStart = taosGetTimestampUs(); pRequest->metric.syntaxStart = taosGetTimestampUs();
...@@ -844,6 +849,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { ...@@ -844,6 +849,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
&pRequest->body.queryJob); &pRequest->body.queryJob);
pCxt = NULL; pCxt = NULL;
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
nodesResetThreadLevelAllocator(NULL);
return; return;
} }
...@@ -851,6 +857,7 @@ _error: ...@@ -851,6 +857,7 @@ _error:
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
pRequest->requestId); pRequest->requestId);
taosMemoryFree(pCxt); taosMemoryFree(pCxt);
nodesResetThreadLevelAllocator(NULL);
terrno = code; terrno = code;
pRequest->code = code; pRequest->code = code;
......
...@@ -92,6 +92,8 @@ bool tsSmlDataFormat = ...@@ -92,6 +92,8 @@ bool tsSmlDataFormat =
int32_t tsQueryPolicy = 1; int32_t tsQueryPolicy = 1;
int32_t tsQuerySmaOptimize = 0; int32_t tsQuerySmaOptimize = 0;
bool tsQueryPlannerTrace = false; bool tsQueryPlannerTrace = false;
int32_t tsQueryNodeChunkSize = 32 * 1024;
bool tsQueryUseNodeAllocator = true;
/* /*
* denote if the server needs to compress response message at the application layer to client, including query rsp, * denote if the server needs to compress response message at the application layer to client, including query rsp,
...@@ -284,6 +286,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { ...@@ -284,6 +286,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1; if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1; if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1;
if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1; if (cfgAddBool(pCfg, "queryPlannerTrace", tsQueryPlannerTrace, true) != 0) return -1;
if (cfgAddInt32(pCfg, "queryNodeChunkSize", tsQueryNodeChunkSize, 1024, 128 * 1024, true) != 0) return -1;
if (cfgAddBool(pCfg, "queryUseNodeAllocator", tsQueryUseNodeAllocator, true) != 0) return -1;
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1; if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1; if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1;
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
...@@ -385,9 +389,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -385,9 +389,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4); tsNumOfQnodeQueryThreads = TMAX(tsNumOfQnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1; if (cfgAddInt32(pCfg, "numOfQnodeQueryThreads", tsNumOfQnodeQueryThreads, 1, 1024, 0) != 0) return -1;
// tsNumOfQnodeFetchThreads = tsNumOfCores / 2; // tsNumOfQnodeFetchThreads = tsNumOfCores / 2;
// tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); // tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
// if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1; // if (cfgAddInt32(pCfg, "numOfQnodeFetchThreads", tsNumOfQnodeFetchThreads, 1, 1024, 0) != 0) return -1;
tsNumOfSnodeSharedThreads = tsNumOfCores / 4; tsNumOfSnodeSharedThreads = tsNumOfCores / 4;
tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4); tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4);
...@@ -527,15 +531,15 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) { ...@@ -527,15 +531,15 @@ static int32_t taosUpdateServerCfg(SConfig *pCfg) {
pItem->stype = stype; pItem->stype = stype;
} }
/* /*
pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads"); pItem = cfgGetItem(tsCfg, "numOfQnodeFetchThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
tsNumOfQnodeFetchThreads = numOfCores / 2; tsNumOfQnodeFetchThreads = numOfCores / 2;
tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4); tsNumOfQnodeFetchThreads = TMAX(tsNumOfQnodeFetchThreads, 4);
pItem->i32 = tsNumOfQnodeFetchThreads; pItem->i32 = tsNumOfQnodeFetchThreads;
pItem->stype = stype; pItem->stype = stype;
} }
*/ */
pItem = cfgGetItem(tsCfg, "numOfSnodeSharedThreads"); pItem = cfgGetItem(tsCfg, "numOfSnodeSharedThreads");
if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) { if (pItem != NULL && pItem->stype == CFG_STYPE_DEFAULT) {
...@@ -643,6 +647,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { ...@@ -643,6 +647,8 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32; tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32;
tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32; tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32;
tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval; tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval;
tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32;
tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval;
return 0; return 0;
} }
...@@ -693,7 +699,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -693,7 +699,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32; tsNumOfVnodeSyncThreads = cfgGetItem(pCfg, "numOfVnodeSyncThreads")->i32;
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
// tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; // tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32; tsNumOfSnodeUniqueThreads = cfgGetItem(pCfg, "numOfSnodeUniqueThreads")->i32;
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
...@@ -941,10 +947,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -941,10 +947,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32; tsNumOfVnodeRsmaThreads = cfgGetItem(pCfg, "numOfVnodeRsmaThreads")->i32;
} else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) { } else if (strcasecmp("numOfQnodeQueryThreads", name) == 0) {
tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32; tsNumOfQnodeQueryThreads = cfgGetItem(pCfg, "numOfQnodeQueryThreads")->i32;
/* /*
} else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) { } else if (strcasecmp("numOfQnodeFetchThreads", name) == 0) {
tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32; tsNumOfQnodeFetchThreads = cfgGetItem(pCfg, "numOfQnodeFetchThreads")->i32;
*/ */
} else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) { } else if (strcasecmp("numOfSnodeSharedThreads", name) == 0) {
tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32; tsNumOfSnodeSharedThreads = cfgGetItem(pCfg, "numOfSnodeSharedThreads")->i32;
} else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) { } else if (strcasecmp("numOfSnodeUniqueThreads", name) == 0) {
...@@ -976,6 +982,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -976,6 +982,10 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
qDebugFlag = cfgGetItem(pCfg, "qDebugFlag")->i32; qDebugFlag = cfgGetItem(pCfg, "qDebugFlag")->i32;
} else if (strcasecmp("queryPlannerTrace", name) == 0) { } else if (strcasecmp("queryPlannerTrace", name) == 0) {
tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval; tsQueryPlannerTrace = cfgGetItem(pCfg, "queryPlannerTrace")->bval;
} else if (strcasecmp("queryNodeChunkSize", name) == 0) {
tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32;
} else if (strcasecmp("queryUseNodeAllocator", name) == 0) {
tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval;
} }
break; break;
} }
......
...@@ -58,9 +58,9 @@ static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) { ...@@ -58,9 +58,9 @@ static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) {
return pNewChunk; return pNewChunk;
} }
static void* nodesCalloc(int32_t num, int32_t size) { static void* nodesCallocImpl(int32_t size) {
if (NULL == pNodeAllocator) { if (NULL == pNodeAllocator) {
return taosMemoryCalloc(num, size); return taosMemoryCalloc(1, size);
} }
if (pNodeAllocator->pCurrChunk->usedSize + size > pNodeAllocator->pCurrChunk->availableSize) { if (pNodeAllocator->pCurrChunk->usedSize + size > pNodeAllocator->pCurrChunk->availableSize) {
...@@ -73,9 +73,19 @@ static void* nodesCalloc(int32_t num, int32_t size) { ...@@ -73,9 +73,19 @@ static void* nodesCalloc(int32_t num, int32_t size) {
return p; return p;
} }
static void* nodesCalloc(int32_t num, int32_t size) {
void* p = nodesCallocImpl(num * size + 1);
if (NULL == p) {
return NULL;
}
*(char*)p = (NULL != pNodeAllocator) ? 1 : 0;
return (char*)p + 1;
}
static void nodesFree(void* p) { static void nodesFree(void* p) {
if (NULL == pNodeAllocator) { char* ptr = (char*)p - 1;
taosMemoryFree(p); if (0 == *ptr) {
taosMemoryFree(ptr);
} }
return; return;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册