diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index dcccbb17c98472d1b1a13d3eea9a255be0763494..94bd5dd7871fa7fa636bfd9c8aa6ff90097ff3e1 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -179,7 +179,6 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { if (code != 0) { terrno = code; if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash); - taosMemoryFreeClear(output.dbVgroup); tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr()); } else if (output.dbVgroup && output.dbVgroup->vgHash) { @@ -189,12 +188,14 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { if (code1 != TSDB_CODE_SUCCESS) { tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tstrerror(code1)); - taosMemoryFreeClear(output.dbVgroup); } else { catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup); + output.dbVgroup = NULL; } } + taosMemoryFreeClear(output.dbVgroup); + tFreeSUsedbRsp(&usedbRsp); char db[TSDB_DB_NAME_LEN] = {0}; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index dce7adfea9aba3a6cb04e773de7bbf33ae633852..fb9f588bae5c1a5ab14a25a6dbb0e47e831c1e4c 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -642,6 +642,7 @@ void ctgFreeSTableIndex(void *info); void ctgClearSubTaskRes(SCtgSubRes *pRes); void ctgFreeQNode(SCtgQNode *node); void ctgClearHandle(SCatalog* pCtg); +void ctgFreeTbCacheImpl(SCtgTbCache *pCache); extern SCatalogMgmt gCtgMgmt; diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 77c1e5b8b119d349e9fb83b8490123a4846c9cb6..499ce7727676381ad28ea0fb672d768e88ac7907 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -647,6 +647,8 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { CTG_RET(TSDB_CODE_OUT_OF_MEMORY); } + bool syncOp = operation->syncOp; + char* opName = gCtgCacheOperation[operation->opId].name; if (operation->syncOp) { tsem_init(&operation->rspSem, 0, 0); } @@ -664,14 +666,14 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { gCtgMgmt.queue.tail = node; CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); + ctgDebug("action [%s] added into queue", opName); + CTG_QUEUE_INC(); CTG_RT_STAT_INC(numOfOpEnqueue, 1); tsem_post(&gCtgMgmt.queue.reqSem); - ctgDebug("action [%s] added into queue", gCtgCacheOperation[operation->opId].name); - - if (operation->syncOp) { + if (syncOp) { tsem_wait(&operation->rspSem); taosMemoryFree(operation); } @@ -840,6 +842,7 @@ _return: ctgFreeVgInfo(dbInfo); taosMemoryFreeClear(op->data); + taosMemoryFreeClear(op); CTG_RET(code); } @@ -852,7 +855,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg)); - CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } char *p = strchr(output->dbFName, '.'); @@ -871,6 +874,11 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy _return: + if (output) { + taosMemoryFree(output->tbMeta); + taosMemoryFree(output); + } + taosMemoryFreeClear(msg); CTG_RET(code); @@ -1753,6 +1761,16 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) { CTG_CACHE_STAT_DEC(numOfStb, 1); } + SCtgTbCache* pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName)); + if (NULL == pTbCache) { + ctgDebug("stb %s already not in cache", msg->stbName); + goto _return; + } + + CTG_LOCK(CTG_WRITE, &pTbCache->metaLock); + ctgFreeTbCacheImpl(pTbCache); + CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock); + if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) { ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid); } else { @@ -1780,14 +1798,24 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) { SCtgDBCache *dbCache = NULL; ctgGetDBCache(pCtg, msg->dbFName, &dbCache); if (NULL == dbCache) { - return TSDB_CODE_SUCCESS; + goto _return; } if (dbCache->dbId != msg->dbId) { ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%"PRIx64", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId, msg->dbFName, msg->tbName); - return TSDB_CODE_SUCCESS; + goto _return; } + SCtgTbCache* pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName)); + if (NULL == pTbCache) { + ctgDebug("tb %s already not in cache", msg->tbName); + goto _return; + } + + CTG_LOCK(CTG_WRITE, &pTbCache->metaLock); + ctgFreeTbCacheImpl(pTbCache); + CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock); + if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) { ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); @@ -2063,6 +2091,8 @@ void* ctgUpdateThreadFunc(void* param) { if (operation->syncOp) { tsem_post(&operation->rspSem); + } else { + taosMemoryFreeClear(operation); } CTG_RT_STAT_INC(numOfOpDequeue, 1); diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 59ad00952733623007f3ce0b5fca34c88fedf829..8e0a5b7de3753baaf08d4628032f40dcf51c711e 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -261,6 +261,8 @@ int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { _return: + taosMemoryFree(pMsg->pData); + if (pJob) { taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId); } diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index ad73fe40d2e9606d10bf2b7fc0c207a0c53674e8..a996a70973a18caa8573af224f0bcaa03938e3a3 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -152,6 +152,7 @@ void ctgFreeStbMetaCache(SCtgDBCache *dbCache) { } void ctgFreeTbCacheImpl(SCtgTbCache *pCache) { + qDebug("tbMeta freed, p:%p", pCache->pMeta); taosMemoryFreeClear(pCache->pMeta); if (pCache->pIndex) { taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo); @@ -831,6 +832,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) if (output->tbMeta) { int32_t metaSize = CTG_META_SIZE(output->tbMeta); (*pOutput)->tbMeta = taosMemoryMalloc(metaSize); + qDebug("tbMeta cloned, size:%d, p:%p", metaSize, (*pOutput)->tbMeta); if (NULL == (*pOutput)->tbMeta) { qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput)); taosMemoryFreeClear(*pOutput); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 6bb8db593c13c28802ff6d63ab48f3da54cd0c66..71d123b7998d1c9759758fad33ff3ab411223d5a 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -41,6 +41,8 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) { if (pTask->execNodes) { taosHashCleanup(pTask->execNodes); } + + taosMemoryFree(pTask->profile.execTime); } int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel, int32_t levelNum) {