提交 9cee07bf 编写于 作者: D dapan1121

fix: fix reset query cache crash issue

上级 54cb6b3e
......@@ -470,6 +470,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_CTG_DB_DROPPED TAOS_DEF_ERROR_CODE(0, 0x2405)
#define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2406)
#define TSDB_CODE_CTG_VG_META_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x2407)
#define TSDB_CODE_CTG_EXIT TAOS_DEF_ERROR_CODE(0, 0x2408)
//scheduler&qworker
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501)
......
......@@ -381,6 +381,7 @@ typedef struct SCtgCacheOperation {
void *data;
bool syncOp;
tsem_t rspSem;
bool lockQ;
} SCtgCacheOperation;
typedef struct SCtgQNode {
......@@ -390,6 +391,7 @@ typedef struct SCtgQNode {
typedef struct SCtgQueue {
SRWLatch qlock;
bool lockQ;
SCtgQNode *head;
SCtgQNode *tail;
tsem_t reqSem;
......@@ -509,8 +511,20 @@ typedef struct SCtgOperation {
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); CTG_RET(__code); } while (0)
#define CTG_API_ENTER() do { CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); CTG_LOCK(CTG_READ, &gCtgMgmt.lock); if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { CTG_API_LEAVE(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0)
#define CTG_API_LEAVE(c) do { \
int32_t __code = c; \
CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); \
CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); \
CTG_RET(__code); \
} while (0)
#define CTG_API_ENTER() do { \
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
CTG_LOCK(CTG_READ, &gCtgMgmt.lock); \
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { \
CTG_API_LEAVE(TSDB_CODE_CTG_OUT_OF_SERVICE); \
} \
} while (0)
void ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p);
void ctgdShowClusterCache(SCatalog* pCtg);
......@@ -543,7 +557,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq);
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet);
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncOp);
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool syncOp);
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool lockQ, bool syncOp);
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type);
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size);
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size);
......
......@@ -1255,7 +1255,7 @@ int32_t catalogClearCache(void) {
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
int32_t code = ctgClearCacheEnqueue(NULL, true);
int32_t code = ctgClearCacheEnqueue(NULL, false, true);
qInfo("clear catalog cache end, code: %s", tstrerror(code));
......@@ -1272,32 +1272,14 @@ void catalogDestroy(void) {
atomic_store_8((int8_t*)&gCtgMgmt.exit, true);
ctgClearCacheEnqueue(NULL, true, true);
if (tsem_post(&gCtgMgmt.queue.reqSem)) {
qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
}
while (CTG_IS_LOCKED(&gCtgMgmt.lock)) {
taosUsleep(1);
}
CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);
SCatalog* pCtg = NULL;
void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
while (pIter) {
pCtg = *(SCatalog**)pIter;
if (pCtg) {
catalogFreeHandle(pCtg);
}
pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
}
taosHashCleanup(gCtgMgmt.pCluster);
gCtgMgmt.pCluster = NULL;
if (CTG_IS_LOCKED(&gCtgMgmt.lock) == TD_RWLATCH_WRITE_FLAG_COPY) CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
qInfo("catalog destroyed");
}
......@@ -659,6 +659,12 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
node->op = operation;
CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
if (gCtgMgmt.queue.lockQ) {
ctgFreeQNode(node);
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
CTG_RET(TSDB_CODE_CTG_EXIT);
}
gCtgMgmt.queue.lockQ = operation->lockQ;
gCtgMgmt.queue.tail->next = node;
gCtgMgmt.queue.tail = node;
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
......@@ -996,11 +1002,12 @@ _return:
}
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool syncOp) {
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool lockQ, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_CLEAR_CACHE;
op->syncOp = syncOp;
op->lockQ = lockQ;
SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg));
if (NULL == msg) {
......@@ -1520,6 +1527,24 @@ _return:
CTG_RET(code);
}
void ctgClearAllCtgInstance(void) {
SCatalog* pCtg = NULL;
void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
while (pIter) {
pCtg = *(SCatalog**)pIter;
if (pCtg) {
catalogFreeHandle(pCtg);
}
pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
}
taosHashClear(gCtgMgmt.pCluster);
}
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgUpdateVgMsg *msg = operation->data;
......@@ -1942,18 +1967,7 @@ int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
goto _return;
}
void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
while (pIter) {
pCtg = *(SCatalog**)pIter;
if (pCtg) {
catalogFreeHandle(pCtg);
}
pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
}
taosHashClear(gCtgMgmt.pCluster);
ctgClearAllCtgInstance();
_return:
......@@ -1962,11 +1976,6 @@ _return:
CTG_RET(code);
}
void ctgUpdateThreadUnexpectedStopped(void) {
if (!atomic_load_8((int8_t*)&gCtgMgmt.exit) && CTG_IS_LOCKED(&gCtgMgmt.lock) > 0) CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
}
void ctgCleanupCacheQueue(void) {
SCtgQNode *node = NULL;
SCtgQNode *nodeNext = NULL;
......@@ -2002,22 +2011,15 @@ void ctgCleanupCacheQueue(void) {
void* ctgUpdateThreadFunc(void* param) {
setThreadName("catalog");
#ifdef WINDOWS
if (taosCheckCurrentInDll()) {
atexit(ctgUpdateThreadUnexpectedStopped);
}
#endif
qInfo("catalog update thread started");
CTG_LOCK(CTG_READ, &gCtgMgmt.lock);
while (true) {
if (tsem_wait(&gCtgMgmt.queue.reqSem)) {
qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
}
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
ctgCleanupCacheQueue();
break;
}
......
......@@ -48,6 +48,10 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
}
}
void ctgFreeQNode(SCtgQNode *node) {
//TODO
}
void ctgFreeSTableIndex(void *info) {
if (NULL == info) {
return;
......
......@@ -1523,11 +1523,13 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
int64_t startTs = taosGetTimestampUs();
strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
/*
if (pInfo->showRewrite) {
char dbName[TSDB_DB_NAME_LEN] = {0};
getDBNameFromCondition(pInfo->pCondition, dbName);
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
}
*/
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);
char* buf1 = taosMemoryCalloc(1, contLen);
......
......@@ -457,6 +457,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error"
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_DB_DROPPED, "Database is dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_OUT_OF_SERVICE, "catalog is out of service")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_VG_META_MISMATCH, "table meta and vgroup mismatch")
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_EXIT, "catalog exit")
//scheduler
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册