提交 ff1f6f3b 编写于 作者: D dapan1121

fix: fix reset query cache issue

上级 be341e1d
...@@ -145,14 +145,6 @@ int32_t catalogInit(SCatalogCfg* cfg); ...@@ -145,14 +145,6 @@ int32_t catalogInit(SCatalogCfg* cfg);
*/ */
int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle); int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle);
/**
* Free a cluster's all catalog info, usually it's not necessary, until the application is closing.
* no current or future usage should be guaranteed by application
* @param pCatalog (input, NO more usage)
* @return error code
*/
void catalogFreeHandle(SCatalog* pCatalog);
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t* tableNum); int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t* tableNum);
/** /**
......
...@@ -383,7 +383,7 @@ typedef struct SCtgCacheOperation { ...@@ -383,7 +383,7 @@ typedef struct SCtgCacheOperation {
void *data; void *data;
bool syncOp; bool syncOp;
tsem_t rspSem; tsem_t rspSem;
bool lockQ; bool stopQueue;
} SCtgCacheOperation; } SCtgCacheOperation;
typedef struct SCtgQNode { typedef struct SCtgQNode {
...@@ -393,7 +393,7 @@ typedef struct SCtgQNode { ...@@ -393,7 +393,7 @@ typedef struct SCtgQNode {
typedef struct SCtgQueue { typedef struct SCtgQueue {
SRWLatch qlock; SRWLatch qlock;
bool lockQ; bool stopQueue;
SCtgQNode *head; SCtgQNode *head;
SCtgQNode *tail; SCtgQNode *tail;
tsem_t reqSem; tsem_t reqSem;
...@@ -559,7 +559,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy ...@@ -559,7 +559,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq); int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq);
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet); int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet);
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncOp); int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncOp);
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool lockQ, bool syncOp); int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool stopQueue, bool syncOp);
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type); int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type);
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size); int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size);
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size); int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size);
...@@ -598,12 +598,13 @@ int32_t ctgLaunchJob(SCtgJob *pJob); ...@@ -598,12 +598,13 @@ int32_t ctgLaunchJob(SCtgJob *pJob);
int32_t ctgMakeAsyncRes(SCtgJob *pJob); int32_t ctgMakeAsyncRes(SCtgJob *pJob);
int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param); int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param);
int32_t ctgGetTbCfgCb(SCtgTask *pTask); int32_t ctgGetTbCfgCb(SCtgTask *pTask);
void ctgFreeHandle(SCatalog* pCatalog);
int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst); int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst);
int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput); int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput);
int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList); int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList);
void ctgFreeJob(void* job); void ctgFreeJob(void* job);
void ctgFreeHandle(SCatalog* pCtg); void ctgFreeHandleImpl(SCatalog* pCtg);
void ctgFreeVgInfo(SDBVgInfo *vgInfo); void ctgFreeVgInfo(SDBVgInfo *vgInfo);
int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup); int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup);
void ctgResetTbMetaTask(SCtgTask* pTask); void ctgResetTbMetaTask(SCtgTask* pTask);
......
...@@ -548,9 +548,11 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { ...@@ -548,9 +548,11 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
} }
CTG_API_ENTER();
if (NULL == gCtgMgmt.pCluster) { if (NULL == gCtgMgmt.pCluster) {
qError("catalog cluster cache are not ready, clusterId:0x%" PRIx64, clusterId); qError("catalog cluster cache are not ready, clusterId:0x%" PRIx64, clusterId);
CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY); CTG_API_LEAVE(TSDB_CODE_CTG_NOT_READY);
} }
int32_t code = 0; int32_t code = 0;
...@@ -562,13 +564,13 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { ...@@ -562,13 +564,13 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
if (ctg && (*ctg)) { if (ctg && (*ctg)) {
*catalogHandle = *ctg; *catalogHandle = *ctg;
qDebug("got catalog handle from cache, clusterId:0x%" PRIx64 ", CTG:%p", clusterId, *ctg); qDebug("got catalog handle from cache, clusterId:0x%" PRIx64 ", CTG:%p", clusterId, *ctg);
return TSDB_CODE_SUCCESS; CTG_API_LEAVE(TSDB_CODE_SUCCESS);
} }
clusterCtg = taosMemoryCalloc(1, sizeof(SCatalog)); clusterCtg = taosMemoryCalloc(1, sizeof(SCatalog));
if (NULL == clusterCtg) { if (NULL == clusterCtg) {
qError("calloc %d failed", (int32_t)sizeof(SCatalog)); qError("calloc %d failed", (int32_t)sizeof(SCatalog));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
} }
clusterCtg->clusterId = clusterId; clusterCtg->clusterId = clusterId;
...@@ -586,7 +588,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { ...@@ -586,7 +588,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
code = taosHashPut(gCtgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES); code = taosHashPut(gCtgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES);
if (code) { if (code) {
if (HASH_NODE_EXIST(code)) { if (HASH_NODE_EXIST(code)) {
ctgFreeHandle(clusterCtg); ctgFreeHandleImpl(clusterCtg);
continue; continue;
} }
...@@ -603,32 +605,13 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { ...@@ -603,32 +605,13 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
CTG_CACHE_STAT_INC(clusterNum, 1); CTG_CACHE_STAT_INC(clusterNum, 1);
return TSDB_CODE_SUCCESS; CTG_API_LEAVE(TSDB_CODE_SUCCESS);
_return: _return:
ctgFreeHandle(clusterCtg); ctgFreeHandleImpl(clusterCtg);
CTG_RET(code);
}
void catalogFreeHandle(SCatalog* pCtg) {
if (NULL == pCtg) {
return;
}
if (taosHashRemove(gCtgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) { CTG_API_LEAVE(code);
ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:0x%" PRIx64, pCtg->clusterId);
return;
}
CTG_CACHE_STAT_DEC(clusterNum, 1);
uint64_t clusterId = pCtg->clusterId;
ctgFreeHandle(pCtg);
ctgInfo("handle freed, culsterId:0x%" PRIx64, clusterId);
} }
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t* tableNum) { int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t* tableNum) {
...@@ -1285,10 +1268,6 @@ void catalogDestroy(void) { ...@@ -1285,10 +1268,6 @@ void catalogDestroy(void) {
ctgClearCacheEnqueue(NULL, true, true); ctgClearCacheEnqueue(NULL, true, true);
if (tsem_post(&gCtgMgmt.queue.reqSem)) {
qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
}
taosHashCleanup(gCtgMgmt.pCluster); taosHashCleanup(gCtgMgmt.pCluster);
gCtgMgmt.pCluster = NULL; gCtgMgmt.pCluster = NULL;
......
...@@ -659,12 +659,12 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { ...@@ -659,12 +659,12 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
node->op = operation; node->op = operation;
CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
if (gCtgMgmt.queue.lockQ) { if (gCtgMgmt.queue.stopQueue) {
ctgFreeQNode(node); ctgFreeQNode(node);
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
CTG_RET(TSDB_CODE_CTG_EXIT); CTG_RET(TSDB_CODE_CTG_EXIT);
} }
gCtgMgmt.queue.lockQ = operation->lockQ; gCtgMgmt.queue.stopQueue = operation->stopQueue;
gCtgMgmt.queue.tail->next = node; gCtgMgmt.queue.tail->next = node;
gCtgMgmt.queue.tail = node; gCtgMgmt.queue.tail = node;
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
...@@ -1002,12 +1002,12 @@ _return: ...@@ -1002,12 +1002,12 @@ _return:
} }
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool lockQ, bool syncOp) { int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool stopQueue, bool syncOp) {
int32_t code = 0; int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_CLEAR_CACHE; op->opId = CTG_OP_CLEAR_CACHE;
op->syncOp = syncOp; op->syncOp = syncOp;
op->lockQ = lockQ; op->stopQueue = stopQueue;
SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg)); SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg));
if (NULL == msg) { if (NULL == msg) {
......
...@@ -196,7 +196,7 @@ void ctgFreeDbCache(SCtgDBCache *dbCache) { ...@@ -196,7 +196,7 @@ void ctgFreeDbCache(SCtgDBCache *dbCache) {
} }
void ctgFreeHandle(SCatalog* pCtg) { void ctgFreeHandleImpl(SCatalog* pCtg) {
ctgFreeMetaRent(&pCtg->dbRent); ctgFreeMetaRent(&pCtg->dbRent);
ctgFreeMetaRent(&pCtg->stbRent); ctgFreeMetaRent(&pCtg->stbRent);
...@@ -235,6 +235,27 @@ void ctgFreeHandle(SCatalog* pCtg) { ...@@ -235,6 +235,27 @@ void ctgFreeHandle(SCatalog* pCtg) {
} }
void ctgFreeHandle(SCatalog* pCtg) {
if (NULL == pCtg) {
return;
}
if (taosHashRemove(gCtgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) {
ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:0x%" PRIx64, pCtg->clusterId);
return;
}
CTG_CACHE_STAT_DEC(clusterNum, 1);
uint64_t clusterId = pCtg->clusterId;
ctgFreeHandleImpl(pCtg);
ctgInfo("handle freed, culsterId:0x%" PRIx64, clusterId);
}
void ctgFreeSUseDbOutput(SUseDbOutput* pOutput) { void ctgFreeSUseDbOutput(SUseDbOutput* pOutput) {
if (NULL == pOutput) { if (NULL == pOutput) {
return; return;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册