提交 2f475399 编写于 作者: X Xiaoyu Wang

enh: added memory allocators for parser and planner

上级 46b0c2c9
......@@ -278,9 +278,15 @@ typedef struct SNodeList {
typedef struct SNodeAllocator SNodeAllocator;
int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator);
void nodesDestroyNodeAllocator(SNodeAllocator* pAllocator);
void nodesDestroyNodeAllocator(void* pAllocator);
void nodesResetThreadLevelAllocator(SNodeAllocator* pAllocator);
int32_t nodesAllocatorInit();
int32_t nodesCreateAllocator(int32_t chunkSize, int64_t* pRefId);
void nodesDestroyAllocator(int64_t refId);
void nodesResetAllocator(int64_t refId);
int64_t nodesIncAllocatorRefCount(int64_t refId);
SNode* nodesMakeNode(ENodeType type);
void nodesDestroyNode(SNode* pNode);
......
......@@ -67,6 +67,7 @@ typedef struct SSchedulerReq {
SRequestConnInfo *pConn;
SArray *pNodeList;
SQueryPlan *pDag;
int64_t allocatorRefId;
const char *sql;
int64_t startTs;
schedulerExecFp execFp;
......
......@@ -250,7 +250,8 @@ typedef struct SRequestObj {
bool inRetry;
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
uint32_t retry;
SNodeAllocator* pNodeAllocator;
// SNodeAllocator* pNodeAllocator;
int64_t allocatorRefId;
} SRequestObj;
typedef struct SSyncQueryParam {
......
......@@ -349,7 +349,7 @@ void doDestroyRequest(void *p) {
taosArrayDestroy(pRequest->tableList);
taosArrayDestroy(pRequest->dbList);
taosArrayDestroy(pRequest->targetTableList);
nodesDestroyNodeAllocator(pRequest->pNodeAllocator);
nodesDestroyAllocator(pRequest->allocatorRefId);
destroyQueryExecRes(&pRequest->body.resInfo.execRes);
......@@ -412,6 +412,7 @@ void taos_init_imp(void) {
initTaskQueue();
fmFuncMgtInit();
nodesAllocatorInit();
clientConnRefPool = taosOpenRef(200, destroyTscObj);
clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
......
......@@ -194,8 +194,9 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
(*pRequest)->allocatorRefId = -1;
if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
if (TSDB_CODE_SUCCESS != nodesCreateNodeAllocator(tsQueryNodeChunkSize, &((*pRequest)->pNodeAllocator))) {
if (TSDB_CODE_SUCCESS != nodesCreateAllocator(tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
tscError("%d failed to create node allocator, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self,
(*pRequest)->requestId, pTscObj->id, sql);
......@@ -1058,6 +1059,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
.allocatorRefId = pRequest->allocatorRefId,
.sql = pRequest->sqlstr,
.startTs = pRequest->metric.start,
.execFp = schedulerExecCb,
......
......@@ -700,7 +700,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
pRequest->metric.ctgEnd = taosGetTimestampUs();
nodesResetThreadLevelAllocator(pRequest->pNodeAllocator);
nodesResetAllocator(pRequest->allocatorRefId);
if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
......@@ -728,11 +728,11 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
pRequest->requestId);
launchAsyncQuery(pRequest, pQuery, pResultMeta);
qDestroyQuery(pQuery);
nodesResetThreadLevelAllocator(NULL);
nodesResetAllocator(-1);
} else {
destorySqlParseWrapper(pWrapper);
qDestroyQuery(pQuery);
nodesResetThreadLevelAllocator(NULL);
nodesResetAllocator(-1);
if (NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
......@@ -805,7 +805,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
}
SQuery *pQuery = NULL;
nodesResetThreadLevelAllocator(pRequest->pNodeAllocator);
nodesResetAllocator(pRequest->allocatorRefId);
pRequest->metric.syntaxStart = taosGetTimestampUs();
......@@ -849,7 +849,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
&pRequest->body.queryJob);
pCxt = NULL;
if (code == TSDB_CODE_SUCCESS) {
nodesResetThreadLevelAllocator(NULL);
nodesResetAllocator(-1);
return;
}
......@@ -857,7 +857,7 @@ _error:
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
pRequest->requestId);
taosMemoryFree(pCxt);
nodesResetThreadLevelAllocator(NULL);
nodesResetAllocator(-1);
terrno = code;
pRequest->code = code;
......
......@@ -21,6 +21,7 @@
#include "taoserror.h"
#include "tdatablock.h"
#include "thash.h"
#include "tref.h"
typedef struct SNodeMemChunk {
int32_t availableSize;
......@@ -30,6 +31,7 @@ typedef struct SNodeMemChunk {
} SNodeMemChunk;
typedef struct SNodeAllocator {
int64_t self;
int32_t chunkSize;
int32_t chunkNum;
SNodeMemChunk* pCurrChunk;
......@@ -37,6 +39,22 @@ typedef struct SNodeAllocator {
} SNodeAllocator;
static threadlocal SNodeAllocator* pNodeAllocator;
static int32_t allocatorReqRefPool = -1;
int32_t nodesAllocatorInit() {
if (allocatorReqRefPool >= 0) {
nodesWarn("nodes already initialized");
return TSDB_CODE_SUCCESS;
}
allocatorReqRefPool = taosOpenRef(40960, nodesDestroyNodeAllocator);
if (allocatorReqRefPool < 0) {
nodesError("init nodes failed");
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) {
SNodeMemChunk* pNewChunk = taosMemoryCalloc(1, sizeof(SNodeMemChunk) + pAllocator->chunkSize);
......@@ -103,11 +121,13 @@ int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator)
return TSDB_CODE_SUCCESS;
}
void nodesDestroyNodeAllocator(SNodeAllocator* pAllocator) {
if (NULL == pAllocator) {
void nodesDestroyNodeAllocator(void* p) {
if (NULL == p) {
return;
}
SNodeAllocator* pAllocator = p;
nodesDebug("alloc chunkNum: %d, chunkTotakSize: %d", pAllocator->chunkNum,
pAllocator->chunkNum * pAllocator->chunkSize);
......@@ -122,6 +142,40 @@ void nodesDestroyNodeAllocator(SNodeAllocator* pAllocator) {
void nodesResetThreadLevelAllocator(SNodeAllocator* pAllocator) { pNodeAllocator = pAllocator; }
int32_t nodesCreateAllocator(int32_t chunkSize, int64_t* pRefId) {
SNodeAllocator* pAllocator = NULL;
int32_t code = nodesCreateNodeAllocator(chunkSize, &pAllocator);
if (TSDB_CODE_SUCCESS == code) {
pAllocator->self = taosAddRef(allocatorReqRefPool, pAllocator);
*pRefId = pAllocator->self;
}
return code;
}
void nodesDestroyAllocator(int64_t refId) {
if (refId < 0) {
return;
}
taosReleaseRef(allocatorReqRefPool, refId);
}
void nodesResetAllocator(int64_t refId) {
if (refId < 0) {
pNodeAllocator = NULL;
} else {
pNodeAllocator = taosAcquireRef(allocatorReqRefPool, refId);
taosReleaseRef(allocatorReqRefPool, refId);
}
}
int64_t nodesIncAllocatorRefCount(int64_t refId) {
if (refId < 0) {
return -1;
}
SNodeAllocator* pAllocator = taosAcquireRef(allocatorReqRefPool, refId);
return pAllocator->self;
}
static SNode* makeNode(ENodeType type, int32_t size) {
SNode* p = nodesCalloc(1, size);
if (NULL == p) {
......
......@@ -254,7 +254,8 @@ typedef struct SSchJob {
SRequestConnInfo conn;
SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeLoad>
SArray *levels; // starting from 0. SArray<SSchLevel>
SQueryPlan *pDag;
SQueryPlan *pDag;
int64_t allocatorRefId;
SArray *dataSrcTasks; // SArray<SQueryTask*>
int32_t levelIdx;
......
......@@ -673,6 +673,7 @@ void schFreeJobImpl(void *job) {
destroyQueryExecRes(&pJob->execRes);
qDestroyQueryPlan(pJob->pDag);
nodesDestroyAllocator(pJob->allocatorRefId);
taosMemoryFreeClear(pJob->userRes.execRes);
taosMemoryFreeClear(pJob->fetchRes);
......@@ -724,6 +725,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
pJob->sql = strdup(pReq->sql);
}
pJob->pDag = pReq->pDag;
pJob->allocatorRefId = nodesIncAllocatorRefCount(pReq->allocatorRefId);
pJob->chkKillFp = pReq->chkKillFp;
pJob->chkKillParam = pReq->chkKillParam;
pJob->userRes.execFp = pReq->execFp;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册