提交 5116e0a0 编写于 作者: X Xiaoyu Wang

enh: added memory allocators for parser and planner

上级 3abfb569
......@@ -277,15 +277,14 @@ typedef struct SNodeList {
typedef struct SNodeAllocator SNodeAllocator;
int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator);
void nodesDestroyNodeAllocator(void* pAllocator);
void nodesResetThreadLevelAllocator(SNodeAllocator* pAllocator);
int32_t nodesAllocatorInit();
int32_t nodesCreateAllocator(int32_t chunkSize, int64_t* pRefId);
void nodesDestroyAllocator(int64_t refId);
void nodesResetAllocator(int64_t refId);
int64_t nodesIncAllocatorRefCount(int64_t refId);
int32_t nodesInitAllocatorSet();
void nodesDestroyAllocatorSet();
int32_t nodesCreateAllocator(int64_t queryId, int32_t chunkSize, int64_t* pAllocatorId);
int32_t nodesAcquireAllocator(int64_t allocatorId);
int32_t nodesReleaseAllocator(int64_t allocatorId);
int64_t nodesMakeAllocatorWeakRef(int64_t allocatorId);
int64_t nodesReleaseAllocatorWeakRef(int64_t allocatorId);
void nodesDestroyAllocator(int64_t allocatorId);
SNode* nodesMakeNode(ENodeType type);
void nodesDestroyNode(SNode* pNode);
......
......@@ -56,6 +56,7 @@ typedef struct SParseContext {
bool nodeOffline;
SArray* pTableMetaPos; // sql table pos => catalog data pos
SArray* pTableVgroupPos; // sql table pos => catalog data pos
int64_t allocatorId;
} SParseContext;
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
......
......@@ -39,6 +39,7 @@ typedef struct SPlanContext {
int32_t msgLen;
const char* pUser;
bool sysInfo;
int64_t allocatorId;
} SPlanContext;
// Create the physical plan for the query, according to the AST.
......
......@@ -413,7 +413,7 @@ void taos_init_imp(void) {
initTaskQueue();
fmFuncMgtInit();
nodesAllocatorInit();
nodesInitAllocatorSet();
clientConnRefPool = taosOpenRef(200, destroyTscObj);
clientReqRefPool = taosOpenRef(40960, doDestroyRequest);
......
......@@ -197,7 +197,8 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
(*pRequest)->allocatorRefId = -1;
if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
if (TSDB_CODE_SUCCESS != nodesCreateAllocator(tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
if (TSDB_CODE_SUCCESS !=
nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
tscError("%d failed to create node allocator, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self,
(*pRequest)->requestId, pTscObj->id, sql);
......@@ -1035,7 +1036,8 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
.pMsg = pRequest->msgBuf,
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
.pUser = pRequest->pTscObj->user,
.sysInfo = pRequest->pTscObj->sysInfo};
.sysInfo = pRequest->pTscObj->sysInfo,
.allocatorId = pRequest->allocatorRefId};
SAppInstInfo* pAppInfo = getAppInfo(pRequest);
SQueryPlan* pDag = NULL;
......@@ -1047,8 +1049,6 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
pRequest->body.subplanNum = pDag->numOfSubplans;
}
nodesResetAllocator(-1);
pRequest->metric.planEnd = taosGetTimestampUs();
if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
......
......@@ -700,8 +700,6 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
pRequest->metric.ctgEnd = taosGetTimestampUs();
nodesResetAllocator(pRequest->allocatorRefId);
if (code == TSDB_CODE_SUCCESS) {
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
pRequest->stableQuery = pQuery->stableQuery;
......@@ -731,7 +729,6 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
} else {
destorySqlParseWrapper(pWrapper);
qDestroyQuery(pQuery);
nodesResetAllocator(-1);
if (NEED_CLIENT_HANDLE_ERROR(code)) {
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
......@@ -778,7 +775,8 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
.enableSysInfo = pTscObj->sysInfo,
.async = true,
.svrVer = pTscObj->sVer,
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes)};
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
.allocatorId = pRequest->allocatorRefId};
return TSDB_CODE_SUCCESS;
}
......@@ -804,13 +802,11 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
}
SQuery *pQuery = NULL;
nodesResetAllocator(pRequest->allocatorRefId);
pRequest->metric.syntaxStart = taosGetTimestampUs();
SCatalogReq catalogReq = {.forceUpdate = updateMetaForce, .qNodeRequired = qnodeRequired(pRequest)};
code = qParseSqlSyntax(pCxt, &pQuery, &catalogReq);
nodesResetAllocator(-1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......
......@@ -32,29 +32,16 @@ typedef struct SNodeMemChunk {
typedef struct SNodeAllocator {
int64_t self;
int64_t queryId;
int32_t chunkSize;
int32_t chunkNum;
SNodeMemChunk* pCurrChunk;
SNodeMemChunk* pChunks;
TdThreadMutex mutex;
} SNodeAllocator;
static threadlocal SNodeAllocator* pNodeAllocator;
static int32_t allocatorReqRefPool = -1;
int32_t nodesAllocatorInit() {
if (allocatorReqRefPool >= 0) {
nodesWarn("nodes already initialized");
return TSDB_CODE_SUCCESS;
}
allocatorReqRefPool = taosOpenRef(40960, nodesDestroyNodeAllocator);
if (allocatorReqRefPool < 0) {
nodesError("init nodes failed");
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
static threadlocal SNodeAllocator* g_pNodeAllocator;
static int32_t g_allocatorReqRefPool = -1;
static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) {
SNodeMemChunk* pNewChunk = taosMemoryCalloc(1, sizeof(SNodeMemChunk) + pAllocator->chunkSize);
......@@ -77,17 +64,17 @@ static SNodeMemChunk* callocNodeChunk(SNodeAllocator* pAllocator) {
}
static void* nodesCallocImpl(int32_t size) {
if (NULL == pNodeAllocator) {
if (NULL == g_pNodeAllocator) {
return taosMemoryCalloc(1, size);
}
if (pNodeAllocator->pCurrChunk->usedSize + size > pNodeAllocator->pCurrChunk->availableSize) {
if (NULL == callocNodeChunk(pNodeAllocator)) {
if (g_pNodeAllocator->pCurrChunk->usedSize + size > g_pNodeAllocator->pCurrChunk->availableSize) {
if (NULL == callocNodeChunk(g_pNodeAllocator)) {
return NULL;
}
}
void* p = pNodeAllocator->pCurrChunk->pBuf + pNodeAllocator->pCurrChunk->usedSize;
pNodeAllocator->pCurrChunk->usedSize += size;
void* p = g_pNodeAllocator->pCurrChunk->pBuf + g_pNodeAllocator->pCurrChunk->usedSize;
g_pNodeAllocator->pCurrChunk->usedSize += size;
return p;
}
......@@ -96,7 +83,7 @@ static void* nodesCalloc(int32_t num, int32_t size) {
if (NULL == p) {
return NULL;
}
*(char*)p = (NULL != pNodeAllocator) ? 1 : 0;
*(char*)p = (NULL != g_pNodeAllocator) ? 1 : 0;
return (char*)p + 1;
}
......@@ -108,7 +95,7 @@ static void nodesFree(void* p) {
return;
}
int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator) {
static int32_t createNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator) {
*pAllocator = taosMemoryCalloc(1, sizeof(SNodeAllocator));
if (NULL == *pAllocator) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -118,18 +105,19 @@ int32_t nodesCreateNodeAllocator(int32_t chunkSize, SNodeAllocator** pAllocator)
taosMemoryFreeClear(*pAllocator);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosThreadMutexInit(&(*pAllocator)->mutex, NULL);
return TSDB_CODE_SUCCESS;
}
void nodesDestroyNodeAllocator(void* p) {
static void destroyNodeAllocator(void* p) {
if (NULL == p) {
return;
}
SNodeAllocator* pAllocator = p;
nodesDebug("alloc chunkNum: %d, chunkTotakSize: %d", pAllocator->chunkNum,
pAllocator->chunkNum * pAllocator->chunkSize);
nodesDebug("query id %" PRIx64 " allocator id %" PRIx64 " alloc chunkNum: %d, chunkTotakSize: %d",
pAllocator->queryId, pAllocator->self, pAllocator->chunkNum, pAllocator->chunkNum * pAllocator->chunkSize);
SNodeMemChunk* pChunk = pAllocator->pChunks;
while (NULL != pChunk) {
......@@ -137,45 +125,103 @@ void nodesDestroyNodeAllocator(void* p) {
taosMemoryFree(pChunk);
pChunk = pTemp;
}
taosThreadMutexDestroy(&pAllocator->mutex);
taosMemoryFree(pAllocator);
}
void nodesResetThreadLevelAllocator(SNodeAllocator* pAllocator) { pNodeAllocator = pAllocator; }
int32_t nodesInitAllocatorSet() {
if (g_allocatorReqRefPool >= 0) {
nodesWarn("nodes already initialized");
return TSDB_CODE_SUCCESS;
}
g_allocatorReqRefPool = taosOpenRef(1024, destroyNodeAllocator);
if (g_allocatorReqRefPool < 0) {
nodesError("init nodes failed");
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
void nodesDestroyAllocatorSet() {
if (g_allocatorReqRefPool >= 0) {
SNodeAllocator* pAllocator = taosIterateRef(g_allocatorReqRefPool, 0);
int64_t refId = 0;
while (NULL != pAllocator) {
refId = pAllocator->self;
taosRemoveRef(g_allocatorReqRefPool, refId);
pAllocator = taosIterateRef(g_allocatorReqRefPool, refId);
}
taosCloseRef(g_allocatorReqRefPool);
}
}
int32_t nodesCreateAllocator(int32_t chunkSize, int64_t* pRefId) {
int32_t nodesCreateAllocator(int64_t queryId, int32_t chunkSize, int64_t* pAllocatorId) {
SNodeAllocator* pAllocator = NULL;
int32_t code = nodesCreateNodeAllocator(chunkSize, &pAllocator);
int32_t code = createNodeAllocator(chunkSize, &pAllocator);
if (TSDB_CODE_SUCCESS == code) {
pAllocator->self = taosAddRef(allocatorReqRefPool, pAllocator);
*pRefId = pAllocator->self;
pAllocator->self = taosAddRef(g_allocatorReqRefPool, pAllocator);
if (pAllocator->self <= 0) {
return terrno;
}
pAllocator->queryId = queryId;
*pAllocatorId = pAllocator->self;
}
return code;
}
void nodesDestroyAllocator(int64_t refId) {
if (refId <= 0) {
return;
int32_t nodesAcquireAllocator(int64_t allocatorId) {
if (allocatorId <= 0) {
return TSDB_CODE_SUCCESS;
}
taosReleaseRef(allocatorReqRefPool, refId);
SNodeAllocator* pAllocator = taosAcquireRef(g_allocatorReqRefPool, allocatorId);
if (NULL == pAllocator) {
return terrno;
}
taosThreadMutexLock(&pAllocator->mutex);
g_pNodeAllocator = pAllocator;
return TSDB_CODE_SUCCESS;
}
void nodesResetAllocator(int64_t refId) {
if (refId <= 0) {
pNodeAllocator = NULL;
} else {
pNodeAllocator = taosAcquireRef(allocatorReqRefPool, refId);
taosReleaseRef(allocatorReqRefPool, refId);
int32_t nodesReleaseAllocator(int64_t allocatorId) {
if (allocatorId <= 0) {
return TSDB_CODE_SUCCESS;
}
if (NULL == g_pNodeAllocator) {
nodesError("allocator id %" PRIx64
" release failed: The nodesReleaseAllocator function needs to be called after the nodesAcquireAllocator "
"function is called!",
allocatorId);
return TSDB_CODE_FAILED;
}
SNodeAllocator* pAllocator = g_pNodeAllocator;
g_pNodeAllocator = NULL;
taosThreadMutexUnlock(&pAllocator->mutex);
return taosReleaseRef(g_allocatorReqRefPool, allocatorId);
}
int64_t nodesIncAllocatorRefCount(int64_t refId) {
if (refId <= 0) {
return -1;
int64_t nodesMakeAllocatorWeakRef(int64_t allocatorId) {
if (allocatorId <= 0) {
return 0;
}
SNodeAllocator* pAllocator = taosAcquireRef(allocatorReqRefPool, refId);
SNodeAllocator* pAllocator = taosAcquireRef(g_allocatorReqRefPool, allocatorId);
return pAllocator->self;
}
int64_t nodesReleaseAllocatorWeakRef(int64_t allocatorId) { return taosReleaseRef(g_allocatorReqRefPool, allocatorId); }
void nodesDestroyAllocator(int64_t allocatorId) {
if (allocatorId <= 0) {
return;
}
taosRemoveRef(g_allocatorReqRefPool, allocatorId);
}
static SNode* makeNode(ENodeType type, int32_t size) {
SNode* p = nodesCalloc(1, size);
if (NULL == p) {
......
......@@ -177,15 +177,18 @@ int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq) {
SParseMetaCache metaCache = {0};
int32_t code = TSDB_CODE_SUCCESS;
if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseInsertSyntax(pCxt, pQuery, &metaCache);
} else {
code = parseSqlSyntax(pCxt, pQuery, &metaCache);
int32_t code = nodesAcquireAllocator(pCxt->allocatorId);
if (TSDB_CODE_SUCCESS == code) {
if (qIsInsertValuesSql(pCxt->pSql, pCxt->sqlLen)) {
code = parseInsertSyntax(pCxt, pQuery, &metaCache);
} else {
code = parseSqlSyntax(pCxt, pQuery, &metaCache);
}
}
if (TSDB_CODE_SUCCESS == code) {
code = buildCatalogReq(pCxt, &metaCache, pCatalogReq);
}
code = nodesReleaseAllocator(pCxt->allocatorId);
destoryParseMetaCache(&metaCache, true);
terrno = code;
return code;
......@@ -194,7 +197,10 @@ int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq
int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCatalogReq,
const struct SMetaData* pMetaData, SQuery* pQuery) {
SParseMetaCache metaCache = {0};
int32_t code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache, NULL == pQuery->pRoot);
int32_t code = nodesAcquireAllocator(pCxt->allocatorId);
if (TSDB_CODE_SUCCESS == code) {
code = putMetaDataToCache(pCatalogReq, pMetaData, &metaCache, NULL == pQuery->pRoot);
}
if (TSDB_CODE_SUCCESS == code) {
if (NULL == pQuery->pRoot) {
code = parseInsertSql(pCxt, &pQuery, &metaCache);
......@@ -202,6 +208,7 @@ int32_t qAnalyseSqlSemantic(SParseContext* pCxt, const struct SCatalogReq* pCata
code = analyseSemantic(pCxt, pQuery, &metaCache);
}
}
code = nodesReleaseAllocator(pCxt->allocatorId);
destoryParseMetaCache(&metaCache, false);
terrno = code;
return code;
......
......@@ -33,7 +33,10 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
SLogicSubplan* pLogicSubplan = NULL;
SQueryLogicPlan* pLogicPlan = NULL;
int32_t code = createLogicPlan(pCxt, &pLogicSubplan);
int32_t code = nodesAcquireAllocator(pCxt->allocatorId);
if (TSDB_CODE_SUCCESS == code) {
code = createLogicPlan(pCxt, &pLogicSubplan);
}
if (TSDB_CODE_SUCCESS == code) {
code = optimizeLogicPlan(pCxt, pLogicSubplan);
}
......@@ -49,6 +52,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
if (TSDB_CODE_SUCCESS == code) {
dumpQueryPlan(*pPlan);
}
code = nodesReleaseAllocator(pCxt->allocatorId);
nodesDestroyNode((SNode*)pLogicSubplan);
nodesDestroyNode((SNode*)pLogicPlan);
......
......@@ -32,6 +32,7 @@ class PlannerEnv : public testing::Environment {
generateMetaData();
initLog(TD_TMP_DIR_PATH "td");
initCfg();
nodesInitAllocatorSet();
}
virtual void TearDown() {
......@@ -39,6 +40,7 @@ class PlannerEnv : public testing::Environment {
qCleanupKeywordsTable();
fmFuncMgtDestroy();
taosCloseLog();
nodesDestroyAllocatorSet();
}
PlannerEnv() {}
......
......@@ -129,10 +129,10 @@ class PlannerTestBaseImpl {
}
void runImpl(const string& sql, int32_t queryPolicy) {
SNodeAllocator* pAllocator = NULL;
int64_t allocatorId = 0;
if (g_useNodeAllocator) {
nodesCreateNodeAllocator(32 * 1024, &pAllocator);
nodesResetThreadLevelAllocator(pAllocator);
nodesCreateAllocator(sqlNo_, 32 * 1024, &allocatorId);
nodesAcquireAllocator(allocatorId);
}
reset();
......@@ -166,13 +166,13 @@ class PlannerTestBaseImpl {
dump(g_dumpModule);
} catch (...) {
dump(DUMP_MODULE_ALL);
nodesDestroyNodeAllocator(pAllocator);
nodesResetThreadLevelAllocator(NULL);
nodesReleaseAllocator(allocatorId);
nodesDestroyAllocator(allocatorId);
throw;
}
nodesDestroyNodeAllocator(pAllocator);
nodesResetThreadLevelAllocator(NULL);
nodesReleaseAllocator(allocatorId);
nodesDestroyAllocator(allocatorId);
}
void prepare(const string& sql) {
......
......@@ -673,7 +673,7 @@ void schFreeJobImpl(void *job) {
destroyQueryExecRes(&pJob->execRes);
qDestroyQueryPlan(pJob->pDag);
nodesDestroyAllocator(pJob->allocatorRefId);
nodesReleaseAllocatorWeakRef(pJob->allocatorRefId);
taosMemoryFreeClear(pJob->userRes.execRes);
taosMemoryFreeClear(pJob->fetchRes);
......@@ -725,7 +725,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
pJob->sql = strdup(pReq->sql);
}
pJob->pDag = pReq->pDag;
pJob->allocatorRefId = nodesIncAllocatorRefCount(pReq->allocatorRefId);
pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId);
pJob->chkKillFp = pReq->chkKillFp;
pJob->chkKillParam = pReq->chkKillParam;
pJob->userRes.execFp = pReq->execFp;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册