提交 965cc65b 编写于 作者: D dapan1121

fix: fix catalog deaklock issue

上级 2a09e003
...@@ -679,6 +679,8 @@ void ctgClearSubTaskRes(SCtgSubRes *pRes); ...@@ -679,6 +679,8 @@ void ctgClearSubTaskRes(SCtgSubRes *pRes);
void ctgFreeQNode(SCtgQNode *node); void ctgFreeQNode(SCtgQNode *node);
void ctgClearHandle(SCatalog* pCtg); void ctgClearHandle(SCatalog* pCtg);
void ctgFreeTbCacheImpl(SCtgTbCache *pCache); void ctgFreeTbCacheImpl(SCtgTbCache *pCache);
int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName);
int32_t ctgGetTbHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const SName *pTableName, SVgroupInfo *pVgroup);
extern SCatalogMgmt gCtgMgmt; extern SCatalogMgmt gCtgMgmt;
......
...@@ -92,7 +92,7 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ...@@ -92,7 +92,7 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx*
int32_t code = 0; int32_t code = 0;
if (!CTG_FLAG_IS_SYS_DB(ctx->flag)) { if (!CTG_FLAG_IS_SYS_DB(ctx->flag)) {
CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pConn, ctx->pName, &vgroupInfo)); CTG_ERR_RET(ctgGetTbHashVgroup(pCtg, pConn, ctx->pName, &vgroupInfo));
} }
STableMetaOutput moutput = {0}; STableMetaOutput moutput = {0};
...@@ -337,7 +337,10 @@ int32_t ctgGetTbType(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, ...@@ -337,7 +337,10 @@ int32_t ctgGetTbType(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName,
} }
STableMeta* pMeta = NULL; STableMeta* pMeta = NULL;
CTG_ERR_RET(catalogGetTableMeta(pCtg, pConn, pTableName, &pMeta)); SCtgTbMetaCtx ctx = {0};
ctx.pName = (SName*)pTableName;
ctx.flag = CTG_FLAG_UNKNOWN_STB;
CTG_ERR_RET(ctgGetTbMeta(pCtg, pConn, &ctx, &pMeta));
*tbType = pMeta->tableType; *tbType = pMeta->tableType;
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
...@@ -391,7 +394,7 @@ int32_t ctgGetTbCfg(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, ...@@ -391,7 +394,7 @@ int32_t ctgGetTbCfg(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName,
CTG_ERR_RET(ctgGetTableCfgFromMnode(pCtg, pConn, pTableName, pCfg, NULL)); CTG_ERR_RET(ctgGetTableCfgFromMnode(pCtg, pConn, pTableName, pCfg, NULL));
} else { } else {
SVgroupInfo vgroupInfo = {0}; SVgroupInfo vgroupInfo = {0};
CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pConn, pTableName, &vgroupInfo)); CTG_ERR_RET(ctgGetTbHashVgroup(pCtg, pConn, pTableName, &vgroupInfo));
CTG_ERR_RET(ctgGetTableCfgFromVnode(pCtg, pConn, pTableName, &vgroupInfo, pCfg, NULL)); CTG_ERR_RET(ctgGetTableCfgFromVnode(pCtg, pConn, pTableName, &vgroupInfo, pCfg, NULL));
} }
...@@ -477,6 +480,57 @@ _return: ...@@ -477,6 +480,57 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgGetTbHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const SName *pTableName, SVgroupInfo *pVgroup) {
if (IS_SYS_DBNAME(pTableName->dbname)) {
ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
SCtgDBCache* dbCache = NULL;
int32_t code = 0;
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
SDBVgInfo *vgInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgCache.vgInfo, pTableName, pVgroup));
_return:
if (dbCache) {
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
if (vgInfo) {
taosHashCleanup(vgInfo->vgHash);
taosMemoryFreeClear(vgInfo);
}
CTG_RET(code);
}
int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName) {
int32_t code = 0;
if (NULL == pCtg || NULL == pTableName) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
if (NULL == pCtg->dbCache) {
return TSDB_CODE_SUCCESS;
}
CTG_ERR_JRET(ctgRemoveTbMetaFromCache(pCtg, pTableName, true));
_return:
CTG_RET(code);
}
int32_t catalogInit(SCatalogCfg* cfg) { int32_t catalogInit(SCatalogCfg* cfg) {
if (gCtgMgmt.pCluster) { if (gCtgMgmt.pCluster) {
qError("catalog already initialized"); qError("catalog already initialized");
...@@ -772,21 +826,7 @@ _return: ...@@ -772,21 +826,7 @@ _return:
int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) { int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) {
CTG_API_ENTER(); CTG_API_ENTER();
int32_t code = 0; CTG_API_LEAVE(ctgRemoveTbMeta(pCtg, pTableName));
if (NULL == pCtg || NULL == pTableName) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
if (NULL == pCtg->dbCache) {
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
CTG_ERR_JRET(ctgRemoveTbMetaFromCache(pCtg, pTableName, true));
_return:
CTG_API_LEAVE(code);
} }
int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid) { int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid) {
...@@ -878,12 +918,12 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, SRequestConnInfo *pConn, SArray* ...@@ -878,12 +918,12 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, SRequestConnInfo *pConn, SArray*
case TSDB_CHILD_TABLE: { case TSDB_CHILD_TABLE: {
SName stb = name; SName stb = name;
strcpy(stb.tname, stbName); strcpy(stb.tname, stbName);
catalogRemoveTableMeta(pCtg, &stb); ctgRemoveTbMeta(pCtg, &stb);
break; break;
} }
case TSDB_SUPER_TABLE: case TSDB_SUPER_TABLE:
case TSDB_NORMAL_TABLE: case TSDB_NORMAL_TABLE:
catalogRemoveTableMeta(pCtg, &name); ctgRemoveTbMeta(pCtg, &name);
break; break;
default: default:
ctgError("ignore table type %d", tbType); ctgError("ignore table type %d", tbType);
...@@ -947,34 +987,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const ...@@ -947,34 +987,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const
int32_t catalogGetTableHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const SName *pTableName, SVgroupInfo *pVgroup) { int32_t catalogGetTableHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const SName *pTableName, SVgroupInfo *pVgroup) {
CTG_API_ENTER(); CTG_API_ENTER();
if (IS_SYS_DBNAME(pTableName->dbname)) { CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, pConn, pTableName, pVgroup));
ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
SCtgDBCache* dbCache = NULL;
int32_t code = 0;
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
SDBVgInfo *vgInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgCache.vgInfo, pTableName, pVgroup));
_return:
if (dbCache) {
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
if (vgInfo) {
taosHashCleanup(vgInfo->vgHash);
taosMemoryFreeClear(vgInfo);
}
CTG_API_LEAVE(code);
} }
int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SCatalogReq* pReq, SMetaData* pRsp) { int32_t catalogGetAllMeta(SCatalog* pCtg, SRequestConnInfo *pConn, const SCatalogReq* pReq, SMetaData* pRsp) {
...@@ -1200,7 +1213,7 @@ int32_t catalogRefreshGetTableCfg(SCatalog* pCtg, SRequestConnInfo *pConn, const ...@@ -1200,7 +1213,7 @@ int32_t catalogRefreshGetTableCfg(SCatalog* pCtg, SRequestConnInfo *pConn, const
} }
int32_t code = 0; int32_t code = 0;
CTG_ERR_JRET(catalogRemoveTableMeta(pCtg, (SName*)pTableName)); CTG_ERR_JRET(ctgRemoveTbMeta(pCtg, (SName*)pTableName));
CTG_ERR_JRET(ctgGetTbCfg(pCtg, pConn, (SName*)pTableName, pCfg)); CTG_ERR_JRET(ctgGetTbCfg(pCtg, pConn, (SName*)pTableName, pCfg));
......
...@@ -398,7 +398,7 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con ...@@ -398,7 +398,7 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con
SName* name = taosHashIterate(pTb, NULL); SName* name = taosHashIterate(pTb, NULL);
while (name) { while (name) {
catalogRemoveTableMeta(pCtg, name); ctgRemoveTbMeta(pCtg, name);
name = taosHashIterate(pTb, name); name = taosHashIterate(pTb, name);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册