未验证 提交 7beac65a 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #14303 from taosdata/hotfix/resetquerycache

fix: fix reset query cache crash issue
......@@ -147,14 +147,6 @@ int32_t catalogInit(SCatalogCfg* cfg);
*/
int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle);
/**
* Free a cluster's all catalog info, usually it's not necessary, until the application is closing.
* no current or future usage should be guaranteed by application
* @param pCatalog (input, NO more usage)
* @return error code
*/
void catalogFreeHandle(SCatalog* pCatalog);
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t* tableNum);
/**
......
......@@ -466,11 +466,11 @@ int32_t* taosGetErrno();
#define TSDB_CODE_CTG_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2400)
#define TSDB_CODE_CTG_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0x2401)
#define TSDB_CODE_CTG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x2402)
#define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403)
#define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404)
#define TSDB_CODE_CTG_DB_DROPPED TAOS_DEF_ERROR_CODE(0, 0x2405)
#define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2406)
#define TSDB_CODE_CTG_VG_META_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x2407)
#define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403)
#define TSDB_CODE_CTG_DB_DROPPED TAOS_DEF_ERROR_CODE(0, 0x2404)
#define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2405)
#define TSDB_CODE_CTG_VG_META_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x2406)
#define TSDB_CODE_CTG_EXIT TAOS_DEF_ERROR_CODE(0, 0x2407)
//scheduler&qworker
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501)
......
......@@ -205,6 +205,7 @@ typedef struct SCtgJob {
SArray* pTasks;
int32_t taskDone;
SMetaData jobRes;
int32_t jobResCode;
int32_t taskIdx;
SRWLatch taskLock;
......@@ -284,24 +285,26 @@ typedef struct SCtgApiStat {
} SCtgApiStat;
typedef struct SCtgRuntimeStat {
uint64_t qNum;
uint64_t qDoneNum;
uint64_t numOfOpAbort;
uint64_t numOfOpEnqueue;
uint64_t numOfOpDequeue;
} SCtgRuntimeStat;
typedef struct SCtgCacheStat {
uint64_t clusterNum;
uint64_t dbNum;
uint64_t tblNum;
uint64_t stblNum;
uint64_t userNum;
uint64_t vgHitNum;
uint64_t vgMissNum;
uint64_t tbMetaHitNum;
uint64_t tbMetaMissNum;
uint64_t tbIndexHitNum;
uint64_t tbIndexMissNum;
uint64_t userHitNum;
uint64_t userMissNum;
uint64_t numOfCluster;
uint64_t numOfDb;
uint64_t numOfTbl;
uint64_t numOfStb;
uint64_t numOfUser;
uint64_t numOfVgHit;
uint64_t numOfVgMiss;
uint64_t numOfMetaHit;
uint64_t numOfMetaMiss;
uint64_t numOfIndexHit;
uint64_t numOfIndexMiss;
uint64_t numOfUserHit;
uint64_t numOfUserMiss;
uint64_t numOfClear;
} SCtgCacheStat;
typedef struct SCatalogStat {
......@@ -371,6 +374,7 @@ typedef struct SCtgDropTbIndexMsg {
typedef struct SCtgClearCacheMsg {
SCatalog* pCtg;
bool freeCtg;
} SCtgClearCacheMsg;
typedef struct SCtgUpdateEpsetMsg {
......@@ -385,6 +389,7 @@ typedef struct SCtgCacheOperation {
void *data;
bool syncOp;
tsem_t rspSem;
bool stopQueue;
} SCtgCacheOperation;
typedef struct SCtgQNode {
......@@ -394,6 +399,7 @@ typedef struct SCtgQNode {
typedef struct SCtgQueue {
SRWLatch qlock;
bool stopQueue;
SCtgQNode *head;
SCtgQNode *tail;
tsem_t reqSem;
......@@ -513,8 +519,35 @@ typedef struct SCtgOperation {
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); CTG_RET(__code); } while (0)
#define CTG_API_ENTER() do { CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); CTG_LOCK(CTG_READ, &gCtgMgmt.lock); if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { CTG_API_LEAVE(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0)
#define CTG_API_LEAVE(c) do { \
int32_t __code = c; \
CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); \
CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); \
CTG_RET(__code); \
} while (0)
#define CTG_API_ENTER() do { \
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
CTG_LOCK(CTG_READ, &gCtgMgmt.lock); \
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { \
CTG_API_LEAVE(TSDB_CODE_CTG_OUT_OF_SERVICE); \
} \
} while (0)
#define CTG_API_LEAVE_NOLOCK(c) do { \
int32_t __code = c; \
CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); \
CTG_RET(__code); \
} while (0)
#define CTG_API_ENTER_NOLOCK() do { \
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { \
CTG_API_LEAVE_NOLOCK(TSDB_CODE_CTG_OUT_OF_SERVICE); \
} \
} while (0)
void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p);
void ctgdShowClusterCache(SCatalog* pCtg);
......@@ -547,7 +580,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq);
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet);
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncOp);
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool syncOp);
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool syncOp);
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type);
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size);
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size);
......@@ -582,17 +615,18 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const S
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask);
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask);
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum);
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param);
int32_t ctgLaunchJob(SCtgJob *pJob);
int32_t ctgMakeAsyncRes(SCtgJob *pJob);
int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param);
int32_t ctgGetTbCfgCb(SCtgTask *pTask);
void ctgFreeHandle(SCatalog* pCatalog);
int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst);
int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput);
int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList);
void ctgFreeJob(void* job);
void ctgFreeHandle(SCatalog* pCtg);
void ctgFreeHandleImpl(SCatalog* pCtg);
void ctgFreeVgInfo(SDBVgInfo *vgInfo);
int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup);
void ctgResetTbMetaTask(SCtgTask* pTask);
......@@ -608,6 +642,8 @@ int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCt
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes);
void ctgFreeSTableIndex(void *info);
void ctgClearSubTaskRes(SCtgSubRes *pRes);
void ctgFreeQNode(SCtgQNode *node);
void ctgClearHandle(SCatalog* pCtg);
extern SCatalogMgmt gCtgMgmt;
......
......@@ -99,7 +99,7 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx*
STableMetaOutput* output = taosMemoryCalloc(1, sizeof(STableMetaOutput));
if (NULL == output) {
ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
......@@ -264,7 +264,7 @@ int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp* rspMsg, bool syncOp) {
STableMetaOutput* output = taosMemoryCalloc(1, sizeof(STableMetaOutput));
if (NULL == output) {
ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
int32_t code = 0;
......@@ -442,7 +442,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTabl
vgList = taosArrayInit(1, sizeof(SVgroupInfo));
if (NULL == vgList) {
ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo));
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
......@@ -548,9 +548,11 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_ENTER();
if (NULL == gCtgMgmt.pCluster) {
qError("catalog cluster cache are not ready, clusterId:0x%" PRIx64, clusterId);
CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
CTG_API_LEAVE(TSDB_CODE_CTG_NOT_READY);
}
int32_t code = 0;
......@@ -562,13 +564,13 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
if (ctg && (*ctg)) {
*catalogHandle = *ctg;
qDebug("got catalog handle from cache, clusterId:0x%" PRIx64 ", CTG:%p", clusterId, *ctg);
return TSDB_CODE_SUCCESS;
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
clusterCtg = taosMemoryCalloc(1, sizeof(SCatalog));
if (NULL == clusterCtg) {
qError("calloc %d failed", (int32_t)sizeof(SCatalog));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY);
}
clusterCtg->clusterId = clusterId;
......@@ -580,13 +582,19 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
false, HASH_ENTRY_LOCK);
if (NULL == clusterCtg->dbCache) {
qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
clusterCtg->userCache = taosHashInit(gCtgMgmt.cfg.maxUserCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == clusterCtg->userCache) {
qError("taosHashInit %d user cache failed", gCtgMgmt.cfg.maxUserCacheNum);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
code = taosHashPut(gCtgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES);
if (code) {
if (HASH_NODE_EXIST(code)) {
ctgFreeHandle(clusterCtg);
ctgFreeHandleImpl(clusterCtg);
continue;
}
......@@ -601,34 +609,15 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
*catalogHandle = clusterCtg;
CTG_CACHE_STAT_INC(clusterNum, 1);
CTG_CACHE_STAT_INC(numOfCluster, 1);
return TSDB_CODE_SUCCESS;
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
_return:
ctgFreeHandle(clusterCtg);
CTG_RET(code);
}
void catalogFreeHandle(SCatalog* pCtg) {
if (NULL == pCtg) {
return;
}
if (taosHashRemove(gCtgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) {
ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:0x%" PRIx64, pCtg->clusterId);
return;
}
CTG_CACHE_STAT_DEC(clusterNum, 1);
ctgFreeHandleImpl(clusterCtg);
uint64_t clusterId = pCtg->clusterId;
ctgFreeHandle(pCtg);
ctgInfo("handle freed, culsterId:0x%" PRIx64, clusterId);
CTG_API_LEAVE(code);
}
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t* tableNum) {
......@@ -1008,7 +997,7 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SCatalo
pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
if (NULL == pRsp->pTableMeta) {
ctgError("taosArrayInit %d failed", tbNum);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
for (int32_t i = 0; i < tbNum; ++i) {
......@@ -1023,7 +1012,7 @@ int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SCatalo
if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
ctgError("taosArrayPush failed, idx:%d", i);
taosMemoryFreeClear(pTableMeta);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
}
}
......@@ -1058,14 +1047,9 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SC
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t code = 0, taskNum = 0;
int32_t code = 0;
SCtgJob *pJob = NULL;
CTG_ERR_JRET(ctgInitJob(pCtg, pConn, &pJob, pReq, fp, param, &taskNum));
if (taskNum <= 0) {
SMetaData* pMetaData = taosMemoryCalloc(1, sizeof(SMetaData));
fp(pMetaData, param, TSDB_CODE_SUCCESS);
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
CTG_ERR_JRET(ctgInitJob(pCtg, pConn, &pJob, pReq, fp, param));
CTG_ERR_JRET(ctgLaunchJob(pJob));
......@@ -1073,6 +1057,7 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SC
// *jobId = pJob->refId;
_return:
if (pJob) {
taosReleaseRef(gCtgMgmt.jobPool, pJob->refId);
......@@ -1274,19 +1259,19 @@ int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth) {
}
int32_t catalogClearCache(void) {
CTG_API_ENTER();
CTG_API_ENTER_NOLOCK();
qInfo("start to clear catalog cache");
if (NULL == gCtgMgmt.pCluster || atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
CTG_API_LEAVE_NOLOCK(TSDB_CODE_SUCCESS);
}
int32_t code = ctgClearCacheEnqueue(NULL, true);
int32_t code = ctgClearCacheEnqueue(NULL, false, false, true);
qInfo("clear catalog cache end, code: %s", tstrerror(code));
CTG_API_LEAVE(code);
CTG_API_LEAVE_NOLOCK(code);
}
......@@ -1299,32 +1284,12 @@ void catalogDestroy(void) {
atomic_store_8((int8_t*)&gCtgMgmt.exit, true);
if (tsem_post(&gCtgMgmt.queue.reqSem)) {
qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
}
while (CTG_IS_LOCKED(&gCtgMgmt.lock)) {
taosUsleep(1);
}
CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);
SCatalog* pCtg = NULL;
void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
while (pIter) {
pCtg = *(SCatalog**)pIter;
if (pCtg) {
catalogFreeHandle(pCtg);
}
pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
if (!taosCheckCurrentInDll()) {
ctgClearCacheEnqueue(NULL, true, true, true);
}
taosHashCleanup(gCtgMgmt.pCluster);
gCtgMgmt.pCluster = NULL;
if (CTG_IS_LOCKED(&gCtgMgmt.lock) == TD_RWLATCH_WRITE_FLAG_COPY) CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
qInfo("catalog destroyed");
}
......@@ -427,7 +427,7 @@ int32_t ctgInitTask(SCtgJob *pJob, CTG_TASK_TYPE type, void* param, int32_t *tas
return TSDB_CODE_SUCCESS;
}
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) {
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param) {
int32_t code = 0;
int32_t tbMetaNum = (int32_t)taosArrayGetSize(pReq->pTableMeta);
int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup);
......@@ -443,11 +443,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum;
if (*taskNum <= 0) {
ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, pConn->requestId);
return TSDB_CODE_SUCCESS;
}
int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum + tbCfgNum;
*job = taosMemoryCalloc(1, sizeof(SCtgJob));
if (NULL == *job) {
......@@ -477,15 +473,15 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
pJob->tbCfgNum = tbCfgNum;
pJob->svrVerNum = svrVerNum;
pJob->pTasks = taosArrayInit(*taskNum, sizeof(SCtgTask));
pJob->pTasks = taosArrayInit(taskNum, sizeof(SCtgTask));
if (NULL == pJob->pTasks) {
ctgError("taosArrayInit %d tasks failed", *taskNum);
ctgError("taosArrayInit %d tasks failed", taskNum);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
if (pReq->forceUpdate) {
CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, *taskNum, pJob, pReq));
if (pReq->forceUpdate && taskNum) {
CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, taskNum, pJob, pReq));
}
for (int32_t i = 0; i < dbVgNum; ++i) {
......@@ -558,11 +554,12 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId, *taskNum, pReq->forceUpdate);
qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId, taskNum, pReq->forceUpdate);
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(*job);
CTG_RET(code);
}
......@@ -763,7 +760,7 @@ int32_t ctgDumpSvrVer(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgInvokeSubCb(SCtgTask *pTask) {
int32_t ctgCallSubCb(SCtgTask *pTask) {
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &pTask->lock);
......@@ -790,6 +787,15 @@ _return:
CTG_RET(code);
}
int32_t ctgCallUserCb(void* param) {
SCtgJob* pJob = (SCtgJob*)param;
(*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode);
taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);
return TSDB_CODE_SUCCESS;
}
int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
SCtgJob* pJob = pTask->pJob;
......@@ -804,7 +810,7 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
pTask->code = rspCode;
pTask->status = CTG_TASK_DONE;
ctgInvokeSubCb(pTask);
ctgCallSubCb(pTask);
int32_t taskDone = atomic_add_fetch_32(&pJob->taskDone, 1);
if (taskDone < taosArrayGetSize(pJob->pTasks)) {
......@@ -818,9 +824,9 @@ _return:
qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(code));
(*pJob->userFp)(&pJob->jobRes, pJob->userParam, code);
taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);
pJob->jobResCode = code;
taosAsyncExec(ctgCallUserCb, pJob, NULL);
CTG_RET(code);
}
......@@ -1697,6 +1703,12 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
pTask->status = CTG_TASK_LAUNCHED;
}
if (taskNum <= 0) {
qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode));
taosAsyncExec(ctgCallUserCb, pJob, NULL);
}
return TSDB_CODE_SUCCESS;
}
......
此差异已折叠。
......@@ -19,7 +19,7 @@
#include "catalogInt.h"
extern SCatalogMgmt gCtgMgmt;
SCtgDebug gCTGDebug = {.cacheEnable = true};
SCtgDebug gCTGDebug = {.lockEnable = true, .apiEnable = true};
void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
ASSERT(*(int32_t*)param == 1);
......@@ -255,8 +255,8 @@ int32_t ctgdEnableDebug(char *option) {
}
int32_t ctgdGetStatNum(char *option, void *res) {
if (0 == strcasecmp(option, "runtime.qDoneNum")) {
*(uint64_t *)res = atomic_load_64(&gCtgMgmt.stat.runtime.qDoneNum);
if (0 == strcasecmp(option, "runtime.numOfOpDequeue")) {
*(uint64_t *)res = atomic_load_64(&gCtgMgmt.stat.runtime.numOfOpDequeue);
return TSDB_CODE_SUCCESS;
}
......
......@@ -52,6 +52,10 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
}
}
void ctgFreeQNode(SCtgQNode *node) {
//TODO
}
void ctgFreeSTableIndex(void *info) {
if (NULL == info) {
return;
......@@ -141,10 +145,10 @@ void ctgFreeStbMetaCache(SCtgDBCache *dbCache) {
return;
}
int32_t stblNum = taosHashGetSize(dbCache->stbCache);
int32_t stbNum = taosHashGetSize(dbCache->stbCache);
taosHashCleanup(dbCache->stbCache);
dbCache->stbCache = NULL;
CTG_CACHE_STAT_DEC(stblNum, stblNum);
CTG_CACHE_STAT_DEC(numOfStb, stbNum);
}
void ctgFreeTbCacheImpl(SCtgTbCache *pCache) {
......@@ -168,7 +172,7 @@ void ctgFreeTbCache(SCtgDBCache *dbCache) {
}
taosHashCleanup(dbCache->tbCache);
dbCache->tbCache = NULL;
CTG_CACHE_STAT_DEC(tblNum, tblNum);
CTG_CACHE_STAT_DEC(numOfTbl, tblNum);
}
void ctgFreeVgInfo(SDBVgInfo *vgInfo) {
......@@ -198,45 +202,108 @@ void ctgFreeDbCache(SCtgDBCache *dbCache) {
ctgFreeTbCache(dbCache);
}
void ctgFreeInstDbCache(SHashObj* pDbCache) {
if (NULL == pDbCache) {
return;
}
int32_t dbNum = taosHashGetSize(pDbCache);
void *pIter = taosHashIterate(pDbCache, NULL);
while (pIter) {
SCtgDBCache *dbCache = pIter;
atomic_store_8(&dbCache->deleted, 1);
ctgFreeDbCache(dbCache);
pIter = taosHashIterate(pDbCache, pIter);
}
taosHashCleanup(pDbCache);
CTG_CACHE_STAT_DEC(numOfDb, dbNum);
}
void ctgFreeHandle(SCatalog* pCtg) {
void ctgFreeInstUserCache(SHashObj* pUserCache) {
if (NULL == pUserCache) {
return;
}
int32_t userNum = taosHashGetSize(pUserCache);
void *pIter = taosHashIterate(pUserCache, NULL);
while (pIter) {
SCtgUserAuth *userCache = pIter;
ctgFreeSCtgUserAuth(userCache);
pIter = taosHashIterate(pUserCache, pIter);
}
taosHashCleanup(pUserCache);
CTG_CACHE_STAT_DEC(numOfUser, userNum);
}
void ctgFreeHandleImpl(SCatalog* pCtg) {
ctgFreeMetaRent(&pCtg->dbRent);
ctgFreeMetaRent(&pCtg->stbRent);
if (pCtg->dbCache) {
int32_t dbNum = taosHashGetSize(pCtg->dbCache);
void *pIter = taosHashIterate(pCtg->dbCache, NULL);
while (pIter) {
SCtgDBCache *dbCache = pIter;
atomic_store_8(&dbCache->deleted, 1);
ctgFreeDbCache(dbCache);
pIter = taosHashIterate(pCtg->dbCache, pIter);
}
taosHashCleanup(pCtg->dbCache);
CTG_CACHE_STAT_DEC(dbNum, dbNum);
ctgFreeInstDbCache(pCtg->dbCache);
ctgFreeInstUserCache(pCtg->userCache);
taosMemoryFree(pCtg);
}
void ctgFreeHandle(SCatalog* pCtg) {
if (NULL == pCtg) {
return;
}
if (pCtg->userCache) {
int32_t userNum = taosHashGetSize(pCtg->userCache);
uint64_t clusterId = pCtg->clusterId;
void *pIter = taosHashIterate(pCtg->userCache, NULL);
while (pIter) {
SCtgUserAuth *userCache = pIter;
ctgFreeSCtgUserAuth(userCache);
ctgFreeMetaRent(&pCtg->dbRent);
ctgFreeMetaRent(&pCtg->stbRent);
pIter = taosHashIterate(pCtg->userCache, pIter);
}
ctgFreeInstDbCache(pCtg->dbCache);
ctgFreeInstUserCache(pCtg->userCache);
taosHashCleanup(pCtg->userCache);
CTG_CACHE_STAT_DEC(userNum, userNum);
}
CTG_CACHE_STAT_DEC(numOfCluster, 1);
taosMemoryFree(pCtg);
ctgInfo("handle freed, culsterId:0x%" PRIx64, clusterId);
}
void ctgClearHandle(SCatalog* pCtg) {
if (NULL == pCtg) {
return;
}
uint64_t clusterId = pCtg->clusterId;
ctgFreeMetaRent(&pCtg->dbRent);
ctgFreeMetaRent(&pCtg->stbRent);
ctgFreeInstDbCache(pCtg->dbCache);
ctgFreeInstUserCache(pCtg->userCache);
ctgMetaRentInit(&pCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB);
ctgMetaRentInit(&pCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE);
pCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pCtg->dbCache) {
qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
}
pCtg->userCache = taosHashInit(gCtgMgmt.cfg.maxUserCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pCtg->userCache) {
ctgError("taosHashInit %d user cache failed", gCtgMgmt.cfg.maxUserCacheNum);
}
CTG_CACHE_STAT_INC(numOfClear, 1);
ctgInfo("handle cleared, culsterId:0x%" PRIx64, clusterId);
}
void ctgFreeSUseDbOutput(SUseDbOutput* pOutput) {
if (NULL == pOutput) {
......@@ -590,7 +657,7 @@ int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList) {
vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
if (NULL == vgList) {
ctgError("taosArrayInit failed, num:%d", vgNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
void *pIter = taosHashIterate(vgHash, NULL);
......@@ -600,7 +667,7 @@ int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList) {
if (NULL == taosArrayPush(vgList, vgInfo)) {
ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
taosHashCancelIterate(vgHash, pIter);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
pIter = taosHashIterate(vgHash, pIter);
......@@ -717,7 +784,7 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
*dst = taosMemoryMalloc(sizeof(SDBVgInfo));
if (NULL == *dst) {
qError("malloc %d failed", (int32_t)sizeof(SDBVgInfo));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(*dst, src, sizeof(SDBVgInfo));
......@@ -727,7 +794,7 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
if (NULL == (*dst)->vgHash) {
qError("taosHashInit %d failed", (int32_t)hashSize);
taosMemoryFreeClear(*dst);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
int32_t *vgId = NULL;
......@@ -740,7 +807,7 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
taosHashCancelIterate(src->vgHash, pIter);
taosHashCleanup((*dst)->vgHash);
taosMemoryFreeClear(*dst);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
pIter = taosHashIterate(src->vgHash, pIter);
......@@ -756,7 +823,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput)
*pOutput = taosMemoryMalloc(sizeof(STableMetaOutput));
if (NULL == *pOutput) {
qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(*pOutput, output, sizeof(STableMetaOutput));
......@@ -767,7 +834,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput)
if (NULL == (*pOutput)->tbMeta) {
qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
taosMemoryFreeClear(*pOutput);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy((*pOutput)->tbMeta, output->tbMeta, metaSize);
......
......@@ -1386,7 +1386,7 @@ TEST(tableMeta, updateStbMeta) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n != 3) {
taosMsleep(50);
} else {
......@@ -1456,7 +1456,7 @@ TEST(refreshGetMeta, normal2normal) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n > 0) {
break;
}
......@@ -1535,7 +1535,7 @@ TEST(refreshGetMeta, normal2notexist) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n > 0) {
break;
}
......@@ -1609,7 +1609,7 @@ TEST(refreshGetMeta, normal2child) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n > 0) {
break;
}
......@@ -1693,7 +1693,7 @@ TEST(refreshGetMeta, stable2child) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n > 0) {
break;
}
......@@ -1778,7 +1778,7 @@ TEST(refreshGetMeta, stable2stable) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n > 0) {
break;
}
......@@ -1866,7 +1866,7 @@ TEST(refreshGetMeta, child2stable) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n > 0) {
break;
}
......@@ -2083,7 +2083,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n > 0) {
break;
}
......@@ -2109,7 +2109,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
while (true) {
uint64_t n = 0;
ctgdGetStatNum("runtime.qDoneNum", (void *)&n);
ctgdGetStatNum("runtime.numOfOpDequeue", (void *)&n);
if (n != 3) {
taosMsleep(50);
} else {
......
......@@ -420,7 +420,7 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
taosHashCancelIterate(pSrc->vgHash, pIter);
taosHashCleanup((*pDst)->vgHash);
taosMemoryFreeClear(*pDst);
return TSDB_CODE_CTG_MEM_ERROR;
return TSDB_CODE_OUT_OF_MEMORY;
}
pIter = taosHashIterate(pSrc->vgHash, pIter);
......
......@@ -453,11 +453,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FS_NO_VALID_DISK, "tfs no valid disk")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INTERNAL_ERROR, "catalog internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input parameters")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_MEMORY, "catalog memory error")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_DB_DROPPED, "Database is dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_OUT_OF_SERVICE, "catalog is out of service")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_VG_META_MISMATCH, "table meta and vgroup mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_EXIT, "catalog exit")
//scheduler
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error")
......
......@@ -44,4 +44,4 @@ if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
system sh/exec.sh -n dnode1 -s stop -x SIGINT
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册