提交 4ad36f71 编写于 作者: D dapan1121

fix: fix reset query cache issue

上级 9c9e8057
......@@ -466,12 +466,11 @@ int32_t* taosGetErrno();
#define TSDB_CODE_CTG_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2400)
#define TSDB_CODE_CTG_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2401)
#define TSDB_CODE_CTG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x2402)
#define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403)
#define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404)
#define TSDB_CODE_CTG_DB_DROPPED TAOS_DEF_ERROR_CODE(0, 0x2405)
#define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2406)
#define TSDB_CODE_CTG_VG_META_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x2407)
#define TSDB_CODE_CTG_EXIT TAOS_DEF_ERROR_CODE(0, 0x2408)
#define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403)
#define TSDB_CODE_CTG_DB_DROPPED TAOS_DEF_ERROR_CODE(0, 0x2404)
#define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2405)
#define TSDB_CODE_CTG_VG_META_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x2406)
#define TSDB_CODE_CTG_EXIT TAOS_DEF_ERROR_CODE(0, 0x2407)
//scheduler&qworker
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501)
......
......@@ -205,6 +205,7 @@ typedef struct SCtgJob {
SArray* pTasks;
int32_t taskDone;
SMetaData jobRes;
int32_t jobResCode;
int32_t taskIdx;
SRWLatch taskLock;
......@@ -284,24 +285,26 @@ typedef struct SCtgApiStat {
} SCtgApiStat;
typedef struct SCtgRuntimeStat {
uint64_t qNum;
uint64_t qDoneNum;
uint64_t numOfOpAbort;
uint64_t numOfOpEnqueue;
uint64_t numOfOpDequeue;
} SCtgRuntimeStat;
typedef struct SCtgCacheStat {
uint64_t clusterNum;
uint64_t dbNum;
uint64_t tblNum;
uint64_t stblNum;
uint64_t userNum;
uint64_t vgHitNum;
uint64_t vgMissNum;
uint64_t tbMetaHitNum;
uint64_t tbMetaMissNum;
uint64_t tbIndexHitNum;
uint64_t tbIndexMissNum;
uint64_t userHitNum;
uint64_t userMissNum;
uint64_t numOfCluster;
uint64_t numOfDb;
uint64_t numOfTbl;
uint64_t numOfStb;
uint64_t numOfUser;
uint64_t numOfVgHit;
uint64_t numOfVgMiss;
uint64_t numOfMetaHit;
uint64_t numOfMetaMiss;
uint64_t numOfIndexHit;
uint64_t numOfIndexMiss;
uint64_t numOfUserHit;
uint64_t numOfUserMiss;
uint64_t numOfClear;
} SCtgCacheStat;
typedef struct SCatalogStat {
......@@ -371,6 +374,7 @@ typedef struct SCtgDropTbIndexMsg {
typedef struct SCtgClearCacheMsg {
SCatalog* pCtg;
bool freeCtg;
} SCtgClearCacheMsg;
typedef struct SCtgUpdateEpsetMsg {
......@@ -530,6 +534,21 @@ typedef struct SCtgOperation {
} \
} while (0)
#define CTG_API_LEAVE_NOLOCK(c) do { \
int32_t __code = c; \
CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); \
CTG_RET(__code); \
} while (0)
#define CTG_API_ENTER_NOLOCK() do { \
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { \
CTG_API_LEAVE_NOLOCK(TSDB_CODE_CTG_OUT_OF_SERVICE); \
} \
} while (0)
void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p);
void ctgdShowClusterCache(SCatalog* pCtg);
int32_t ctgdShowCacheInfo(void);
......@@ -561,7 +580,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq);
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet);
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncOp);
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool stopQueue, bool syncOp);
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool syncOp);
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type);
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size);
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size);
......@@ -596,7 +615,7 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask);
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask);
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum);
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param);
int32_t ctgLaunchJob(SCtgJob *pJob);
int32_t ctgMakeAsyncRes(SCtgJob *pJob);
int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param);
......@@ -623,6 +642,8 @@ int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCt
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes);
void ctgFreeSTableIndex(void *info);
void ctgClearSubTaskRes(SCtgSubRes *pRes);
void ctgFreeQNode(SCtgQNode *node);
void ctgClearHandle(SCatalog* pCtg);
extern SCatalogMgmt gCtgMgmt;
......
......@@ -99,7 +99,7 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx*
STableMetaOutput* output = taosMemoryCalloc(1, sizeof(STableMetaOutput));
if (NULL == output) {
ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
......@@ -264,7 +264,7 @@ int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp* rspMsg, bool syncOp) {
STableMetaOutput* output = taosMemoryCalloc(1, sizeof(STableMetaOutput));
if (NULL == output) {
ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
int32_t code = 0;
......@@ -442,7 +442,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTabl
vgList = taosArrayInit(1, sizeof(SVgroupInfo));
if (NULL == vgList) {
ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo));
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
......@@ -570,7 +570,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
clusterCtg = taosMemoryCalloc(1, sizeof(SCatalog));
if (NULL == clusterCtg) {
qError("calloc %d failed", (int32_t)sizeof(SCatalog));
CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY);
}
clusterCtg->clusterId = clusterId;
......@@ -582,7 +582,13 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
false, HASH_ENTRY_LOCK);
if (NULL == clusterCtg->dbCache) {
qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
clusterCtg->userCache = taosHashInit(gCtgMgmt.cfg.maxUserCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == clusterCtg->userCache) {
qError("taosHashInit %d user cache failed", gCtgMgmt.cfg.maxUserCacheNum);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
code = taosHashPut(gCtgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES);
......@@ -603,7 +609,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
*catalogHandle = clusterCtg;
CTG_CACHE_STAT_INC(clusterNum, 1);
CTG_CACHE_STAT_INC(numOfCluster, 1);
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
......@@ -991,7 +997,7 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SCatalo
pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
if (NULL == pRsp->pTableMeta) {
ctgError("taosArrayInit %d failed", tbNum);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
for (int32_t i = 0; i < tbNum; ++i) {
......@@ -1006,7 +1012,7 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SCatalo
if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
ctgError("taosArrayPush failed, idx:%d", i);
taosMemoryFreeClear(pTableMeta);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
}
}
......@@ -1041,14 +1047,9 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SC
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t code = 0, taskNum = 0;
int32_t code = 0;
SCtgJob *pJob = NULL;
CTG_ERR_JRET(ctgInitJob(pCtg, pConn, &pJob, pReq, fp, param, &taskNum));
if (taskNum <= 0) {
SMetaData* pMetaData = taosMemoryCalloc(1, sizeof(SMetaData));
fp(pMetaData, param, TSDB_CODE_SUCCESS);
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
CTG_ERR_JRET(ctgInitJob(pCtg, pConn, &pJob, pReq, fp, param));
CTG_ERR_JRET(ctgLaunchJob(pJob));
......@@ -1056,6 +1057,7 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SC
// *jobId = pJob->refId;
_return:
if (pJob) {
taosReleaseRef(gCtgMgmt.jobPool, pJob->refId);
......@@ -1257,19 +1259,19 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth) {
}
int32_t catalogClearCache(void) {
CTG_API_ENTER();
CTG_API_ENTER_NOLOCK();
qInfo("start to clear catalog cache");
if (NULL == gCtgMgmt.pCluster || atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
CTG_API_LEAVE_NOLOCK(TSDB_CODE_SUCCESS);
}
int32_t code = ctgClearCacheEnqueue(NULL, false, true);
int32_t code = ctgClearCacheEnqueue(NULL, false, false, true);
qInfo("clear catalog cache end, code: %s", tstrerror(code));
CTG_API_LEAVE(code);
CTG_API_LEAVE_NOLOCK(code);
}
......@@ -1282,7 +1284,7 @@ void catalogDestroy(void) {
atomic_store_8((int8_t*)&gCtgMgmt.exit, true);
ctgClearCacheEnqueue(NULL, true, true);
ctgClearCacheEnqueue(NULL, true, true, true);
taosHashCleanup(gCtgMgmt.pCluster);
gCtgMgmt.pCluster = NULL;
......
......@@ -427,7 +427,7 @@ int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *tas
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) {
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param) {
int32_t code = 0;
int32_t tbMetaNum = (int32_t)taosArrayGetSize(pReq->pTableMeta);
int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup);
......@@ -443,11 +443,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum;
if (*taskNum <= 0) {
ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, pConn->requestId);
return TSDB_CODE_SUCCESS;
}
int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum;
*job = taosMemoryCalloc(1, sizeof(SCtgJob));
if (NULL == *job) {
......@@ -477,15 +473,15 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
pJob->tbCfgNum = tbCfgNum;
pJob->svrVerNum = svrVerNum;
pJob->pTasks = taosArrayInit(*taskNum, sizeof(SCtgTask));
pJob->pTasks = taosArrayInit(taskNum, sizeof(SCtgTask));
if (NULL == pJob->pTasks) {
ctgError("taosArrayInit %d tasks failed", *taskNum);
ctgError("taosArrayInit %d tasks failed", taskNum);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
if (pReq->forceUpdate) {
CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, *taskNum, pJob, pReq));
if (pReq->forceUpdate && taskNum) {
CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, taskNum, pJob, pReq));
}
for (int32_t i = 0; i < dbVgNum; ++i) {
......@@ -558,11 +554,12 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId, *taskNum, pReq->forceUpdate);
qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId, taskNum, pReq->forceUpdate);
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(*job);
CTG_RET(code);
}
......@@ -763,7 +760,7 @@ int32_t ctgDumpSvrVer(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgInvokeSubCb(SCtgTask *pTask) {
int32_t ctgCallSubCb(SCtgTask *pTask) {
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &pTask->lock);
......@@ -790,6 +787,15 @@ _return:
CTG_RET(code);
}
int32_t ctgCallUserCb(void* param) {
SCtgJob* pJob = (SCtgJob*)param;
(*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode);
taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);
return TSDB_CODE_SUCCESS;
}
int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
SCtgJob* pJob = pTask->pJob;
......@@ -804,7 +810,7 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
pTask->code = rspCode;
pTask->status = CTG_TASK_DONE;
ctgInvokeSubCb(pTask);
ctgCallSubCb(pTask);
int32_t taskDone = atomic_add_fetch_32(&pJob->taskDone, 1);
if (taskDone < taosArrayGetSize(pJob->pTasks)) {
......@@ -818,9 +824,9 @@ _return:
qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(code));
(*pJob->userFp)(&pJob->jobRes, pJob->userParam, code);
taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);
pJob->jobResCode = code;
taosAsyncExec(ctgCallUserCb, pJob, NULL);
CTG_RET(code);
}
......@@ -1697,6 +1703,12 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
pTask->status = CTG_TASK_LAUNCHED;
}
if (taskNum <= 0) {
qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode));
taosAsyncExec(ctgCallUserCb, pJob, NULL);
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -214,7 +214,7 @@ int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCac
*pCache = dbCache;
CTG_CACHE_STAT_INC(vgHitNum, 1);
CTG_CACHE_STAT_INC(numOfVgHit, 1);
ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
......@@ -228,7 +228,7 @@ _return:
*pCache = NULL;
CTG_CACHE_STAT_INC(vgMissNum, 1);
CTG_CACHE_STAT_INC(numOfVgMiss, 1);
return TSDB_CODE_SUCCESS;
}
......@@ -260,7 +260,7 @@ int32_t ctgAcquireTbMetaFromCache(SCatalog* pCtg, char *dbFName, char* tbName, S
ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);
CTG_CACHE_STAT_INC(tbMetaHitNum, 1);
CTG_CACHE_STAT_INC(numOfMetaHit, 1);
return TSDB_CODE_SUCCESS;
......@@ -268,7 +268,7 @@ _return:
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_CACHE_STAT_INC(tbMetaMissNum, 1);
CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
return TSDB_CODE_SUCCESS;
}
......@@ -307,7 +307,7 @@ int32_t ctgAcquireStbMetaFromCache(SCatalog* pCtg, char *dbFName, uint64_t suid,
ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);
CTG_CACHE_STAT_INC(tbMetaHitNum, 1);
CTG_CACHE_STAT_INC(numOfMetaHit, 1);
return TSDB_CODE_SUCCESS;
......@@ -315,7 +315,7 @@ _return:
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_CACHE_STAT_INC(tbMetaMissNum, 1);
CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
*pDb = NULL;
*pTb = NULL;
......@@ -351,7 +351,7 @@ int32_t ctgAcquireTbIndexFromCache(SCatalog* pCtg, char *dbFName, char* tbName,
ctgDebug("tb %s index got in cache, dbFName:%s", tbName, dbFName);
CTG_CACHE_STAT_INC(tbIndexHitNum, 1);
CTG_CACHE_STAT_INC(numOfIndexHit, 1);
return TSDB_CODE_SUCCESS;
......@@ -359,7 +359,7 @@ _return:
ctgReleaseTbIndexToCache(pCtg, dbCache, pCache);
CTG_CACHE_STAT_INC(tbIndexMissNum, 1);
CTG_CACHE_STAT_INC(numOfIndexMiss, 1);
return TSDB_CODE_SUCCESS;
}
......@@ -455,7 +455,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
*pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize);
if (NULL == *pTableMeta) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
......@@ -583,11 +583,6 @@ _return:
}
int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass) {
if (NULL == pCtg->userCache) {
ctgDebug("empty user auth cache, user:%s", user);
goto _return;
}
SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, user, strlen(user));
if (NULL == pUser) {
ctgDebug("user not in cache, user:%s", user);
......@@ -597,7 +592,7 @@ int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE
*inCache = true;
ctgDebug("Got user from cache, user:%s", user);
CTG_CACHE_STAT_INC(userHitNum, 1);
CTG_CACHE_STAT_INC(numOfUserHit, 1);
if (pUser->superUser) {
*pass = true;
......@@ -626,7 +621,7 @@ int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE
_return:
*inCache = false;
CTG_CACHE_STAT_INC(userMissNum, 1);
CTG_CACHE_STAT_INC(numOfUserMiss, 1);
return TSDB_CODE_SUCCESS;
}
......@@ -649,7 +644,7 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
if (NULL == node) {
qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
CTG_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
}
if (operation->syncOp) {
......@@ -670,7 +665,7 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
CTG_QUEUE_INC();
CTG_RT_STAT_INC(qNum, 1);
CTG_RT_STAT_INC(numOfOpEnqueue, 1);
tsem_post(&gCtgMgmt.queue.reqSem);
......@@ -693,7 +688,7 @@ int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId)
SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
char *p = strchr(dbFName, '.');
......@@ -726,7 +721,7 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp)
SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
char *p = strchr(dbFName, '.');
......@@ -760,7 +755,7 @@ int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId,
SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
msg->pCtg = pCtg;
......@@ -792,7 +787,7 @@ int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId,
SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
msg->pCtg = pCtg;
......@@ -822,7 +817,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
ctgFreeVgInfo(dbInfo);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
char *p = strchr(dbFName, '.');
......@@ -857,7 +852,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
char *p = strchr(output->dbFName, '.');
......@@ -889,7 +884,7 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEp
SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
msg->pCtg = pCtg;
......@@ -921,7 +916,7 @@ int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp
SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
msg->pCtg = pCtg;
......@@ -950,7 +945,7 @@ int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncO
SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg));
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
msg->pCtg = pCtg;
......@@ -981,7 +976,7 @@ int32_t ctgDropTbIndexEnqueue(SCatalog* pCtg, SName* pName, bool syncOp) {
SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
msg->pCtg = pCtg;
......@@ -1002,7 +997,7 @@ _return:
}
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool stopQueue, bool syncOp) {
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_CLEAR_CACHE;
......@@ -1012,10 +1007,11 @@ int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool stopQueue, bool syncOp) {
SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgClearCacheMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
msg->pCtg = pCtg;
msg->freeCtg = freeCtg;
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
......@@ -1040,7 +1036,7 @@ int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
mgmt->slots = taosMemoryCalloc(1, msgSize);
if (NULL == mgmt->slots) {
qError("calloc %d failed", (int32_t)msgSize);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
......@@ -1060,13 +1056,13 @@ int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size)
slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
if (NULL == slot->meta) {
qError("taosArrayInit %d failed, id:0x%"PRIx64", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
}
if (NULL == taosArrayPush(slot->meta, meta)) {
qError("taosArrayPush meta to rent failed, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
slot->needSort = true;
......@@ -1184,7 +1180,7 @@ int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_
*res = taosMemoryMalloc(msize);
if (NULL == *res) {
qError("malloc %d failed", (int32_t)msize);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
void *meta = taosArrayGet(slot->meta, 0);
......@@ -1234,13 +1230,13 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == newDBCache.tbCache) {
ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
newDBCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
if (NULL == newDBCache.stbCache) {
ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
code = taosHashPut(pCtg->dbCache, dbFName, strlen(dbFName), &newDBCache, sizeof(SCtgDBCache));
......@@ -1251,10 +1247,10 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
}
ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_CACHE_STAT_INC(dbNum, 1);
CTG_CACHE_STAT_INC(numOfDb, 1);
SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1};
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
......@@ -1315,7 +1311,7 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
}
CTG_CACHE_STAT_DEC(dbNum, 1);
CTG_CACHE_STAT_DEC(numOfDb, 1);
ctgInfo("db removed from cache, dbFName:%s, dbId:0x%"PRIx64, dbFName, dbId);
return TSDB_CODE_SUCCESS;
......@@ -1412,7 +1408,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%"PRIx64, dbFName, tbName, orig->suid);
} else {
CTG_CACHE_STAT_DEC(stblNum, 1);
CTG_CACHE_STAT_DEC(numOfStb, 1);
ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%"PRIx64, dbFName, tbName, orig->suid);
}
......@@ -1426,7 +1422,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(SCtgTbCache)) != 0) {
taosMemoryFree(meta);
ctgError("taosHashPut new tbCache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
......@@ -1436,7 +1432,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
}
if (NULL == orig) {
CTG_CACHE_STAT_INC(tblNum, 1);
CTG_CACHE_STAT_INC(numOfTbl, 1);
}
ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
......@@ -1448,10 +1444,10 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
if (origSuid != meta->suid && taosHashPut(dbCache->stbCache, &meta->suid, sizeof(meta->suid), tbName, strlen(tbName) + 1) != 0) {
ctgError("taosHashPut to stable cache failed, suid:0x%"PRIx64, meta->suid);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_CACHE_STAT_INC(stblNum, 1);
CTG_CACHE_STAT_INC(numOfStb, 1);
ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName, meta->tableType);
......@@ -1479,7 +1475,7 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFNa
ctgFreeSTableIndex(*index);
taosMemoryFreeClear(*index);
ctgError("taosHashPut new tbCache failed, tbName:%s", tbName);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
*index = NULL;
......@@ -1527,7 +1523,7 @@ _return:
CTG_RET(code);
}
void ctgClearAllCtgInstance(void) {
void ctgClearAllInstance(void) {
SCatalog* pCtg = NULL;
void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
......@@ -1535,7 +1531,24 @@ void ctgClearAllCtgInstance(void) {
pCtg = *(SCatalog**)pIter;
if (pCtg) {
catalogFreeHandle(pCtg);
ctgClearHandle(pCtg);
}
pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
}
taosHashClear(gCtgMgmt.pCluster);
}
void ctgFreeAllInstance(void) {
SCatalog* pCtg = NULL;
void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
while (pIter) {
pCtg = *(SCatalog**)pIter;
if (pCtg) {
ctgFreeHandle(pCtg);
}
pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
......@@ -1559,7 +1572,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d",
dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
bool newAdded = false;
......@@ -1739,13 +1752,13 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) {
ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
} else {
CTG_CACHE_STAT_DEC(stblNum, 1);
CTG_CACHE_STAT_DEC(numOfStb, 1);
}
if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
} else {
CTG_CACHE_STAT_DEC(tblNum, 1);
CTG_CACHE_STAT_DEC(numOfTbl, 1);
}
ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
......@@ -1781,7 +1794,7 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} else {
CTG_CACHE_STAT_DEC(tblNum, 1);
CTG_CACHE_STAT_DEC(numOfTbl, 1);
}
ctgDebug("table %s removed from cache, dbFName:%s", msg->tbName, msg->dbFName);
......@@ -1797,14 +1810,6 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgUpdateUserMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
if (NULL == pCtg->userCache) {
pCtg->userCache = taosHashInit(gCtgMgmt.cfg.maxUserCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pCtg->userCache) {
ctgError("taosHashInit %d user cache failed", gCtgMgmt.cfg.maxUserCacheNum);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
}
SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user));
if (NULL == pUser) {
......@@ -1962,14 +1967,27 @@ int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
SCtgClearCacheMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);
if (pCtg) {
catalogFreeHandle(pCtg);
if (msg->freeCtg) {
ctgFreeHandle(pCtg);
} else {
ctgClearHandle(pCtg);
}
goto _return;
}
ctgClearAllCtgInstance();
if (msg->freeCtg) {
ctgFreeAllInstance();
} else {
ctgClearAllInstance();
}
_return:
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
taosMemoryFreeClear(msg);
......@@ -1979,16 +1997,29 @@ _return:
void ctgCleanupCacheQueue(void) {
SCtgQNode *node = NULL;
SCtgQNode *nodeNext = NULL;
SCtgCacheOperation *op = NULL;
bool stopQueue = false;
while (true) {
node = gCtgMgmt.queue.head->next;
while (node) {
if (node->op) {
taosMemoryFree(node->op->data);
if (node->op->syncOp) {
tsem_post(&node->op->rspSem);
op = node->op;
if (op->stopQueue) {
SCatalog *pCtg = ((SCtgUpdateMsgHeader *)op->data)->pCtg;
ctgDebug("process [%s] operation", gCtgCacheOperation[op->opId].name);
(*gCtgCacheOperation[op->opId].func)(op);
stopQueue = true;
CTG_RT_STAT_INC(numOfOpDequeue, 1);
} else {
taosMemoryFree(op->data);
CTG_RT_STAT_INC(numOfOpAbort, 1);
}
if (op->syncOp) {
tsem_post(&op->rspSem);
} else {
taosMemoryFree(node->op);
taosMemoryFree(op);
}
}
......@@ -1998,7 +2029,7 @@ void ctgCleanupCacheQueue(void) {
node = nodeNext;
}
if (CTG_IS_LOCKED(&gCtgMgmt.lock)) {
if (!stopQueue) {
taosUsleep(1);
} else {
break;
......@@ -2036,7 +2067,7 @@ void* ctgUpdateThreadFunc(void* param) {
tsem_post(&operation->rspSem);
}
CTG_RT_STAT_INC(qDoneNum, 1);
CTG_RT_STAT_INC(numOfOpDequeue, 1);
ctgdShowCacheInfo();
ctgdShowClusterCache(pCtg);
......
......@@ -19,7 +19,7 @@
#include "catalogInt.h"
extern SCatalogMgmt gCtgMgmt;
SCtgDebug gCTGDebug = {.cacheEnable = true};
SCtgDebug gCTGDebug = {.lockEnable = true, .apiEnable = true};
void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
ASSERT(*(int32_t*)param == 1);
......@@ -255,8 +255,8 @@ int32_t ctgdEnableDebug(char *option) {
}
int32_t ctgdGetStatNum(char *option, void *res) {
if (0 == strcasecmp(option, "runtime.qDoneNum")) {
*(uint64_t *)res = atomic_load_64(&gCtgMgmt.stat.runtime.qDoneNum);
if (0 == strcasecmp(option, "runtime.numOfOpDequeue")) {
*(uint64_t *)res = atomic_load_64(&gCtgMgmt.stat.runtime.numOfOpDequeue);
return TSDB_CODE_SUCCESS;
}
......
......@@ -145,10 +145,10 @@ void ctgFreeStbMetaCache(SCtgDBCache *dbCache) {
return;
}
int32_t stblNum = taosHashGetSize(dbCache->stbCache);
int32_t stbNum = taosHashGetSize(dbCache->stbCache);
taosHashCleanup(dbCache->stbCache);
dbCache->stbCache = NULL;
CTG_CACHE_STAT_DEC(stblNum, stblNum);
CTG_CACHE_STAT_DEC(numOfStb, stbNum);
}
void ctgFreeTbCacheImpl(SCtgTbCache *pCache) {
......@@ -172,7 +172,7 @@ void ctgFreeTbCache(SCtgDBCache *dbCache) {
}
taosHashCleanup(dbCache->tbCache);
dbCache->tbCache = NULL;
CTG_CACHE_STAT_DEC(tblNum, tblNum);
CTG_CACHE_STAT_DEC(numOfTbl, tblNum);
}
void ctgFreeVgInfo(SDBVgInfo *vgInfo) {
......@@ -202,41 +202,53 @@ void ctgFreeDbCache(SCtgDBCache *dbCache) {
ctgFreeTbCache(dbCache);
}
void ctgFreeHandleImpl(SCatalog* pCtg) {
ctgFreeMetaRent(&pCtg->dbRent);
ctgFreeMetaRent(&pCtg->stbRent);
void ctgFreeInstDbCache(SHashObj* pDbCache) {
if (NULL == pDbCache) {
return;
}
if (pCtg->dbCache) {
int32_t dbNum = taosHashGetSize(pCtg->dbCache);
void *pIter = taosHashIterate(pCtg->dbCache, NULL);
while (pIter) {
SCtgDBCache *dbCache = pIter;
atomic_store_8(&dbCache->deleted, 1);
ctgFreeDbCache(dbCache);
pIter = taosHashIterate(pCtg->dbCache, pIter);
}
int32_t dbNum = taosHashGetSize(pDbCache);
void *pIter = taosHashIterate(pDbCache, NULL);
while (pIter) {
SCtgDBCache *dbCache = pIter;
atomic_store_8(&dbCache->deleted, 1);
ctgFreeDbCache(dbCache);
pIter = taosHashIterate(pDbCache, pIter);
}
taosHashCleanup(pDbCache);
CTG_CACHE_STAT_DEC(numOfDb, dbNum);
}
taosHashCleanup(pCtg->dbCache);
CTG_CACHE_STAT_DEC(dbNum, dbNum);
void ctgFreeInstUserCache(SHashObj* pUserCache) {
if (NULL == pUserCache) {
return;
}
int32_t userNum = taosHashGetSize(pUserCache);
void *pIter = taosHashIterate(pUserCache, NULL);
while (pIter) {
SCtgUserAuth *userCache = pIter;
ctgFreeSCtgUserAuth(userCache);
pIter = taosHashIterate(pUserCache, pIter);
}
taosHashCleanup(pUserCache);
CTG_CACHE_STAT_DEC(numOfUser, userNum);
}
if (pCtg->userCache) {
int32_t userNum = taosHashGetSize(pCtg->userCache);
void *pIter = taosHashIterate(pCtg->userCache, NULL);
while (pIter) {
SCtgUserAuth *userCache = pIter;
ctgFreeSCtgUserAuth(userCache);
pIter = taosHashIterate(pCtg->userCache, pIter);
}
void ctgFreeHandleImpl(SCatalog* pCtg) {
ctgFreeMetaRent(&pCtg->dbRent);
ctgFreeMetaRent(&pCtg->stbRent);
taosHashCleanup(pCtg->userCache);
CTG_CACHE_STAT_DEC(userNum, userNum);
}
ctgFreeInstDbCache(pCtg->dbCache);
ctgFreeInstUserCache(pCtg->userCache);
taosMemoryFree(pCtg);
}
......@@ -247,21 +259,51 @@ void ctgFreeHandle(SCatalog* pCtg) {
return;
}
if (taosHashRemove(gCtgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) {
ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:0x%" PRIx64, pCtg->clusterId);
return;
}
uint64_t clusterId = pCtg->clusterId;
CTG_CACHE_STAT_DEC(clusterNum, 1);
ctgFreeMetaRent(&pCtg->dbRent);
ctgFreeMetaRent(&pCtg->stbRent);
uint64_t clusterId = pCtg->clusterId;
ctgFreeInstDbCache(pCtg->dbCache);
ctgFreeInstUserCache(pCtg->userCache);
ctgFreeHandleImpl(pCtg);
CTG_CACHE_STAT_DEC(numOfCluster, 1);
taosMemoryFree(pCtg);
ctgInfo("handle freed, culsterId:0x%" PRIx64, clusterId);
}
void ctgClearHandle(SCatalog* pCtg) {
if (NULL == pCtg) {
return;
}
uint64_t clusterId = pCtg->clusterId;
ctgFreeMetaRent(&pCtg->dbRent);
ctgFreeMetaRent(&pCtg->stbRent);
ctgFreeInstDbCache(pCtg->dbCache);
ctgFreeInstUserCache(pCtg->userCache);
ctgMetaRentInit(&pCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB);
ctgMetaRentInit(&pCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE);
pCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pCtg->dbCache) {
qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
}
pCtg->userCache = taosHashInit(gCtgMgmt.cfg.maxUserCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pCtg->userCache) {
ctgError("taosHashInit %d user cache failed", gCtgMgmt.cfg.maxUserCacheNum);
}
CTG_CACHE_STAT_INC(numOfClear, 1);
ctgInfo("handle cleared, culsterId:0x%" PRIx64, clusterId);
}
void ctgFreeSUseDbOutput(SUseDbOutput* pOutput) {
if (NULL == pOutput) {
......@@ -615,7 +657,7 @@ int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList) {
vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
if (NULL == vgList) {
ctgError("taosArrayInit failed, num:%d", vgNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
void *pIter = taosHashIterate(vgHash, NULL);
......@@ -625,7 +667,7 @@ int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList) {
if (NULL == taosArrayPush(vgList, vgInfo)) {
ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
taosHashCancelIterate(vgHash, pIter);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
pIter = taosHashIterate(vgHash, pIter);
......@@ -742,7 +784,7 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
*dst = taosMemoryMalloc(sizeof(SDBVgInfo));
if (NULL == *dst) {
qError("malloc %d failed", (int32_t)sizeof(SDBVgInfo));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(*dst, src, sizeof(SDBVgInfo));
......@@ -752,7 +794,7 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
if (NULL == (*dst)->vgHash) {
qError("taosHashInit %d failed", (int32_t)hashSize);
taosMemoryFreeClear(*dst);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
int32_t *vgId = NULL;
......@@ -765,7 +807,7 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
taosHashCancelIterate(src->vgHash, pIter);
taosHashCleanup((*dst)->vgHash);
taosMemoryFreeClear(*dst);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
pIter = taosHashIterate(src->vgHash, pIter);
......@@ -781,7 +823,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput)
*pOutput = taosMemoryMalloc(sizeof(STableMetaOutput));
if (NULL == *pOutput) {
qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(*pOutput, output, sizeof(STableMetaOutput));
......@@ -792,7 +834,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput)
if (NULL == (*pOutput)->tbMeta) {
qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
taosMemoryFreeClear(*pOutput);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy((*pOutput)->tbMeta, output->tbMeta, metaSize);
......
......@@ -1386,7 +1386,7 @@ TEST(tableMeta, updateStbMeta) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n != 3) {
taosMsleep(50);
} else {
......@@ -1456,7 +1456,7 @@ TEST(refreshGetMeta, normal2normal) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n > 0) {
break;
}
......@@ -1535,7 +1535,7 @@ TEST(refreshGetMeta, normal2notexist) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n > 0) {
break;
}
......@@ -1609,7 +1609,7 @@ TEST(refreshGetMeta, normal2child) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n > 0) {
break;
}
......@@ -1693,7 +1693,7 @@ TEST(refreshGetMeta, stable2child) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n > 0) {
break;
}
......@@ -1778,7 +1778,7 @@ TEST(refreshGetMeta, stable2stable) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n > 0) {
break;
}
......@@ -1866,7 +1866,7 @@ TEST(refreshGetMeta, child2stable) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n > 0) {
break;
}
......@@ -2083,7 +2083,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n > 0) {
break;
}
......@@ -2109,7 +2109,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n != 3) {
taosMsleep(50);
} else {
......
......@@ -416,7 +416,7 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
taosHashCancelIterate(pSrc->vgHash, pIter);
taosHashCleanup((*pDst)->vgHash);
taosMemoryFreeClear(*pDst);
return TSDB_CODE_CTG_MEM_ERROR;
return TSDB_CODE_OUT_OF_MEMORY;
}
pIter = taosHashIterate(pSrc->vgHash, pIter);
......
......@@ -453,7 +453,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_VALID_DISK, "tfs no valid disk")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_ERROR, "catalog internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input parameters")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_MEMORY, "catalog memory error")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_DB_DROPPED, "Database is dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_OUT_OF_SERVICE, "catalog is out of service")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册