diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index 6935489ff4000aa37aed912769120713102ed4c1..ee0577523d73d6afce6f125c65da6f867068fa0a 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -13,96 +13,46 @@ * along with this program. If not, see . */ -#include "trpc.h" -#include "query.h" -#include "tname.h" #include "catalogInt.h" +#include "query.h" #include "systable.h" +#include "tname.h" +#include "trpc.h" -SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = { - { - CTG_OP_UPDATE_VGROUP, - "update vgInfo", - ctgOpUpdateVgroup - }, - { - CTG_OP_UPDATE_TB_META, - "update tbMeta", - ctgOpUpdateTbMeta - }, - { - CTG_OP_DROP_DB_CACHE, - "drop DB", - ctgOpDropDbCache - }, - { - CTG_OP_DROP_DB_VGROUP, - "drop DBVgroup", - ctgOpDropDbVgroup - }, - { - CTG_OP_DROP_STB_META, - "drop stbMeta", - ctgOpDropStbMeta - }, - { - CTG_OP_DROP_TB_META, - "drop tbMeta", - ctgOpDropTbMeta - }, - { - CTG_OP_UPDATE_USER, - "update user", - ctgOpUpdateUser - }, - { - CTG_OP_UPDATE_VG_EPSET, - "update epset", - ctgOpUpdateEpset - }, - { - CTG_OP_UPDATE_TB_INDEX, - "update tbIndex", - ctgOpUpdateTbIndex - }, - { - CTG_OP_DROP_TB_INDEX, - "drop tbIndex", - ctgOpDropTbIndex - }, - { - CTG_OP_CLEAR_CACHE, - "clear cache", - ctgOpClearCache - } -}; - - - +SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update vgInfo", ctgOpUpdateVgroup}, + {CTG_OP_UPDATE_TB_META, "update tbMeta", ctgOpUpdateTbMeta}, + {CTG_OP_DROP_DB_CACHE, "drop DB", ctgOpDropDbCache}, + {CTG_OP_DROP_DB_VGROUP, "drop DBVgroup", ctgOpDropDbVgroup}, + {CTG_OP_DROP_STB_META, "drop stbMeta", ctgOpDropStbMeta}, + {CTG_OP_DROP_TB_META, "drop tbMeta", ctgOpDropTbMeta}, + {CTG_OP_UPDATE_USER, "update user", ctgOpUpdateUser}, + {CTG_OP_UPDATE_VG_EPSET, "update epset", ctgOpUpdateEpset}, + {CTG_OP_UPDATE_TB_INDEX, "update tbIndex", ctgOpUpdateTbIndex}, + {CTG_OP_DROP_TB_INDEX, "drop tbIndex", ctgOpDropTbIndex}, + {CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}}; int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) { CTG_LOCK(CTG_READ, &dbCache->vgCache.vgLock); - + if (dbCache->deleted) { CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); - ctgDebug("db is dropping, dbId:0x%"PRIx64, dbCache->dbId); - + ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId); + *inCache = false; return TSDB_CODE_SUCCESS; } - if (NULL == dbCache->vgCache.vgInfo) { CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); *inCache = false; - ctgDebug("db vgInfo is empty, dbId:0x%"PRIx64, dbCache->dbId); + ctgDebug("db vgInfo is empty, dbId:0x%" PRIx64, dbCache->dbId); return TSDB_CODE_SUCCESS; } *inCache = true; - + return TSDB_CODE_SUCCESS; } @@ -110,7 +60,7 @@ int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) { CTG_LOCK(CTG_WRITE, &dbCache->vgCache.vgLock); if (dbCache->deleted) { - ctgDebug("db is dropping, dbId:0x%"PRIx64, dbCache->dbId); + ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId); CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock); CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); } @@ -118,19 +68,13 @@ int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) { return TSDB_CODE_SUCCESS; } -void ctgRUnlockVgInfo(SCtgDBCache *dbCache) { - CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); -} +void ctgRUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); } -void ctgWUnlockVgInfo(SCtgDBCache *dbCache) { - CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock); -} +void ctgWUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock); } -void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) { - CTG_UNLOCK(CTG_READ, &dbCache->dbLock); -} +void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_READ, &dbCache->dbLock); } -int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) { +int32_t ctgAcquireDBCacheImpl(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) { char *p = strchr(dbFName, '.'); if (p && IS_SYS_DBNAME(p + 1)) { dbFName = p + 1; @@ -150,35 +94,35 @@ int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache * if (dbCache->deleted) { if (acquire) { ctgReleaseDBCache(pCtg, dbCache); - } - + } + *pCache = NULL; ctgDebug("db is removing from cache, dbFName:%s", dbFName); return TSDB_CODE_SUCCESS; } *pCache = dbCache; - + return TSDB_CODE_SUCCESS; } -int32_t ctgAcquireDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) { +int32_t ctgAcquireDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) { CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true)); } -int32_t ctgGetDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) { +int32_t ctgGetDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) { CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false)); } -void ctgReleaseVgInfoToCache(SCatalog* pCtg, SCtgDBCache *dbCache) { +void ctgReleaseVgInfoToCache(SCatalog *pCtg, SCtgDBCache *dbCache) { ctgRUnlockVgInfo(dbCache); ctgReleaseDBCache(pCtg, dbCache); } -void ctgReleaseTbMetaToCache(SCatalog* pCtg, SCtgDBCache *dbCache, SCtgTbCache* pCache) { +void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) { if (pCache) { CTG_UNLOCK(CTG_READ, &pCache->metaLock); - taosHashRelease(dbCache->tbCache, pCache); + taosHashRelease(dbCache->tbCache, pCache); } if (dbCache) { @@ -186,10 +130,10 @@ void ctgReleaseTbMetaToCache(SCatalog* pCtg, SCtgDBCache *dbCache, SCtgTbCache* } } -void ctgReleaseTbIndexToCache(SCatalog* pCtg, SCtgDBCache *dbCache, SCtgTbCache* pCache) { +void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) { if (pCache) { CTG_UNLOCK(CTG_READ, &pCache->indexLock); - taosHashRelease(dbCache->tbCache, pCache); + taosHashRelease(dbCache->tbCache, pCache); } if (dbCache) { @@ -197,10 +141,10 @@ void ctgReleaseTbIndexToCache(SCatalog* pCtg, SCtgDBCache *dbCache, SCtgTbCache* } } -int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) { +int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) { SCtgDBCache *dbCache = NULL; ctgAcquireDBCache(pCtg, dbFName, &dbCache); - if (NULL == dbCache) { + if (NULL == dbCache) { ctgDebug("db %s not in cache", dbFName); goto _return; } @@ -217,7 +161,7 @@ int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCac CTG_CACHE_STAT_INC(numOfVgHit, 1); ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName); - + return TSDB_CODE_SUCCESS; _return: @@ -229,19 +173,19 @@ _return: *pCache = NULL; CTG_CACHE_STAT_INC(numOfVgMiss, 1); - + return TSDB_CODE_SUCCESS; } -int32_t ctgAcquireTbMetaFromCache(SCatalog* pCtg, char *dbFName, char* tbName, SCtgDBCache **pDb, SCtgTbCache** pTb) { +int32_t ctgAcquireTbMetaFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) { SCtgDBCache *dbCache = NULL; - SCtgTbCache* pCache = NULL; + SCtgTbCache *pCache = NULL; ctgAcquireDBCache(pCtg, dbFName, &dbCache); if (NULL == dbCache) { ctgDebug("db %s not in cache", dbFName); goto _return; } - + pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName)); if (NULL == pCache) { ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName); @@ -258,7 +202,7 @@ int32_t ctgAcquireTbMetaFromCache(SCatalog* pCtg, char *dbFName, char* tbName, S *pTb = pCache; ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName); - + CTG_CACHE_STAT_INC(numOfMetaHit, 1); return TSDB_CODE_SUCCESS; @@ -268,20 +212,20 @@ _return: ctgReleaseTbMetaToCache(pCtg, dbCache, pCache); CTG_CACHE_STAT_INC(numOfMetaMiss, 1); - + return TSDB_CODE_SUCCESS; } -int32_t ctgAcquireStbMetaFromCache(SCatalog* pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache** pTb) { - SCtgDBCache* dbCache = NULL; - SCtgTbCache* pCache = NULL; +int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) { + SCtgDBCache *dbCache = NULL; + SCtgTbCache *pCache = NULL; ctgAcquireDBCache(pCtg, dbFName, &dbCache); if (NULL == dbCache) { ctgDebug("db %s not in cache", dbFName); goto _return; } - - char* stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid)); + + char *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid)); if (NULL == stName) { ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName); goto _return; @@ -304,7 +248,7 @@ int32_t ctgAcquireStbMetaFromCache(SCatalog* pCtg, char *dbFName, uint64_t suid, *pTb = pCache; ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName); - + CTG_CACHE_STAT_INC(numOfMetaHit, 1); return TSDB_CODE_SUCCESS; @@ -317,20 +261,19 @@ _return: *pDb = NULL; *pTb = NULL; - + return TSDB_CODE_SUCCESS; } - -int32_t ctgAcquireTbIndexFromCache(SCatalog* pCtg, char *dbFName, char* tbName, SCtgDBCache **pDb, SCtgTbCache** pTb) { +int32_t ctgAcquireTbIndexFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) { SCtgDBCache *dbCache = NULL; - SCtgTbCache* pCache = NULL; + SCtgTbCache *pCache = NULL; ctgAcquireDBCache(pCtg, dbFName, &dbCache); if (NULL == dbCache) { ctgDebug("db %s not in cache", dbFName); goto _return; } - + int32_t sz = 0; pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName)); if (NULL == pCache) { @@ -348,7 +291,7 @@ int32_t ctgAcquireTbIndexFromCache(SCatalog* pCtg, char *dbFName, char* tbName, *pTb = pCache; ctgDebug("tb %s index got in cache, dbFName:%s", tbName, dbFName); - + CTG_CACHE_STAT_INC(numOfIndexHit, 1); return TSDB_CODE_SUCCESS; @@ -358,32 +301,31 @@ _return: ctgReleaseTbIndexToCache(pCtg, dbCache, pCache); CTG_CACHE_STAT_INC(numOfIndexMiss, 1); - + return TSDB_CODE_SUCCESS; } - -int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) { +int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *exist) { SCtgDBCache *dbCache = NULL; SCtgTbCache *tbCache = NULL; ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache); if (NULL == tbCache) { ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - + *exist = 0; return TSDB_CODE_SUCCESS; } *exist = 1; ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - + return TSDB_CODE_SUCCESS; } -int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) { - int32_t code = 0; +int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) { + int32_t code = 0; SCtgDBCache *dbCache = NULL; - SCtgTbCache *tbCache = NULL; + SCtgTbCache *tbCache = NULL; *pTableMeta = NULL; char dbFName[TSDB_DB_FNAME_LEN] = {0}; @@ -399,12 +341,12 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** return TSDB_CODE_SUCCESS; } - STableMeta* tbMeta = tbCache->pMeta; + STableMeta *tbMeta = tbCache->pMeta; ctx->tbInfo.inCache = true; ctx->tbInfo.dbId = dbCache->dbId; ctx->tbInfo.suid = tbMeta->suid; ctx->tbInfo.tbType = tbMeta->tableType; - + if (tbMeta->tableType != TSDB_CHILD_TABLE) { int32_t metaSize = CTG_META_SIZE(tbMeta); *pTableMeta = taosMemoryCalloc(1, metaSize); @@ -414,14 +356,14 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** } memcpy(*pTableMeta, tbMeta, metaSize); - + ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName); return TSDB_CODE_SUCCESS; } // PROCESS FOR CHILD TABLE - + int32_t metaSize = sizeof(SCTableMeta); *pTableMeta = taosMemoryCalloc(1, metaSize); if (NULL == *pTableMeta) { @@ -429,10 +371,10 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** } memcpy(*pTableMeta, tbMeta, metaSize); - + ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", - ctx->pName->tname, ctx->tbInfo.tbType, dbFName); + ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname, + ctx->tbInfo.tbType, dbFName); ctgAcquireStbMetaFromCache(pCtg, dbFName, ctx->tbInfo.suid, &dbCache, &tbCache); if (NULL == tbCache) { @@ -441,17 +383,17 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid); return TSDB_CODE_SUCCESS; } - - STableMeta* stbMeta = tbCache->pMeta; - if (stbMeta->suid != ctx->tbInfo.suid) { + + STableMeta *stbMeta = tbCache->pMeta; + if (stbMeta->suid != ctx->tbInfo.suid) { ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%"PRIx64 , stbMeta->suid, ctx->tbInfo.suid); + ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } metaSize = CTG_META_SIZE(stbMeta); *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize); - if (NULL == *pTableMeta) { + if (NULL == *pTableMeta) { ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } @@ -461,24 +403,24 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); ctgDebug("Got tb %s meta from cache, dbFName:%s", ctx->pName->tname, dbFName); - + return TSDB_CODE_SUCCESS; _return: ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); taosMemoryFreeClear(*pTableMeta); - + CTG_RET(code); } -int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, - char *stbName) { +int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, + uint64_t *suid, char *stbName) { *sver = -1; *tver = -1; SCtgDBCache *dbCache = NULL; - SCtgTbCache *tbCache = NULL; + SCtgTbCache *tbCache = NULL; char dbFName[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(pTableName, dbFName); @@ -488,7 +430,7 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, return TSDB_CODE_SUCCESS; } - STableMeta* tbMeta = tbCache->pMeta; + STableMeta *tbMeta = tbCache->pMeta; *tbType = tbMeta->tableType; *suid = tbMeta->suid; @@ -496,29 +438,29 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, *sver = tbMeta->sversion; *tver = tbMeta->tversion; - ctgDebug("Got tb %s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:0x%" PRIx64, - pTableName->tname, dbFName, *tbType, *sver, *tver, *suid); + ctgDebug("Got tb %s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:0x%" PRIx64, pTableName->tname, + dbFName, *tbType, *sver, *tver, *suid); ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); return TSDB_CODE_SUCCESS; } // PROCESS FOR CHILD TABLE - + ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); ctgDebug("Got ctb %s ver from cache, will continue to get its stb ver, dbFName:%s", pTableName->tname, dbFName); - + ctgAcquireStbMetaFromCache(pCtg, dbFName, *suid, &dbCache, &tbCache); if (NULL == tbCache) { ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); ctgDebug("stb 0x%" PRIx64 " meta not in cache", *suid); return TSDB_CODE_SUCCESS; } - - STableMeta* stbMeta = tbCache->pMeta; + + STableMeta *stbMeta = tbCache->pMeta; if (stbMeta->suid != *suid) { ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid:0x%" PRIx64 , stbMeta->suid, *suid); + ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid:0x%" PRIx64, stbMeta->suid, *suid); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -533,15 +475,15 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - ctgDebug("Got tb %s sver %d tver %d from cache, type:%d, dbFName:%s", pTableName->tname, *sver, *tver, *tbType, dbFName); + ctgDebug("Got tb %s sver %d tver %d from cache, type:%d, dbFName:%s", pTableName->tname, *sver, *tver, *tbType, + dbFName); return TSDB_CODE_SUCCESS; } - -int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tbName, int32_t *tbType) { +int32_t ctgReadTbTypeFromCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *tbType) { SCtgDBCache *dbCache = NULL; - SCtgTbCache *tbCache = NULL; + SCtgTbCache *tbCache = NULL; CTG_ERR_RET(ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache)); if (NULL == tbCache) { ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); @@ -551,15 +493,15 @@ int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tbName, int3 *tbType = tbCache->pMeta->tableType; ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); - ctgDebug("Got tb %s tbType %d from cache, dbFName:%s", tbName, *tbType, dbFName); - + ctgDebug("Got tb %s tbType %d from cache, dbFName:%s", tbName, *tbType, dbFName); + return TSDB_CODE_SUCCESS; } -int32_t ctgReadTbIndexFromCache(SCatalog* pCtg, SName* pTableName, SArray** pRes) { - int32_t code = 0; +int32_t ctgReadTbIndexFromCache(SCatalog *pCtg, SName *pTableName, SArray **pRes) { + int32_t code = 0; SCtgDBCache *dbCache = NULL; - SCtgTbCache *tbCache = NULL; + SCtgTbCache *tbCache = NULL; char dbFName[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(pTableName, dbFName); @@ -580,14 +522,14 @@ _return: CTG_RET(code); } -int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass) { +int32_t ctgChkAuthFromCache(SCatalog *pCtg, char *user, char *dbFName, AUTH_TYPE type, bool *inCache, bool *pass) { char *p = strchr(dbFName, '.'); if (p) { ++p; } else { p = dbFName; } - + if (IS_SYS_DBNAME(p)) { *inCache = true; *pass = true; @@ -605,7 +547,7 @@ int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE ctgDebug("Got user from cache, user:%s", user); CTG_CACHE_STAT_INC(numOfUserHit, 1); - + if (pUser->superUser) { *pass = true; return TSDB_CODE_SUCCESS; @@ -617,54 +559,53 @@ int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE CTG_UNLOCK(CTG_READ, &pUser->lock); return TSDB_CODE_SUCCESS; } - + if (pUser->readDbs && taosHashGet(pUser->readDbs, dbFName, strlen(dbFName)) && type == AUTH_TYPE_READ) { *pass = true; } - + if (pUser->writeDbs && taosHashGet(pUser->writeDbs, dbFName, strlen(dbFName)) && type == AUTH_TYPE_WRITE) { *pass = true; } CTG_UNLOCK(CTG_READ, &pUser->lock); - + return TSDB_CODE_SUCCESS; _return: *inCache = false; CTG_CACHE_STAT_INC(numOfUserMiss, 1); - + return TSDB_CODE_SUCCESS; } void ctgDequeue(SCtgCacheOperation **op) { SCtgQNode *orig = gCtgMgmt.queue.head; - + SCtgQNode *node = gCtgMgmt.queue.head->next; gCtgMgmt.queue.head = gCtgMgmt.queue.head->next; CTG_QUEUE_DEC(); - + taosMemoryFreeClear(orig); *op = node->op; } - -int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { +int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) { SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode)); if (NULL == node) { qError("calloc %d failed", (int32_t)sizeof(SCtgQNode)); CTG_RET(TSDB_CODE_OUT_OF_MEMORY); } - bool syncOp = operation->syncOp; - char* opName = gCtgCacheOperation[operation->opId].name; + bool syncOp = operation->syncOp; + char *opName = gCtgCacheOperation[operation->opId].name; if (operation->syncOp) { tsem_init(&operation->rspSem, 0, 0); } - + node->op = operation; CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock); @@ -699,12 +640,11 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) { return TSDB_CODE_SUCCESS; } - -int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) { - int32_t code = 0; +int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId) { + int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_DROP_DB_CACHE; - + SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg)); @@ -732,12 +672,12 @@ _return: CTG_RET(code); } -int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) { - int32_t code = 0; +int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp) { + int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_DROP_DB_VGROUP; op->syncOp = syncOp; - + SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg)); @@ -764,14 +704,13 @@ _return: CTG_RET(code); } - - -int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncOp) { - int32_t code = 0; +int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, + bool syncOp) { + int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_DROP_STB_META; op->syncOp = syncOp; - + SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg)); @@ -796,14 +735,12 @@ _return: CTG_RET(code); } - - -int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) { - int32_t code = 0; +int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) { + int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_DROP_TB_META; op->syncOp = syncOp; - + SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg)); @@ -827,12 +764,12 @@ _return: CTG_RET(code); } -int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncOp) { - int32_t code = 0; +int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, SDBVgInfo *dbInfo, bool syncOp) { + int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_UPDATE_VGROUP; op->syncOp = syncOp; - + SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg)); @@ -864,12 +801,12 @@ _return: CTG_RET(code); } -int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncOp) { - int32_t code = 0; +int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool syncOp) { + int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_UPDATE_TB_META; op->syncOp = syncOp; - + SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg)); @@ -889,7 +826,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy CTG_ERR_JRET(ctgEnqueue(pCtg, op)); return TSDB_CODE_SUCCESS; - + _return: if (output) { @@ -898,15 +835,15 @@ _return: } taosMemoryFreeClear(msg); - + CTG_RET(code); } -int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet) { - int32_t code = 0; +int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEpSet *pEpSet) { + int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_UPDATE_VG_EPSET; - + SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg)); @@ -923,22 +860,20 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEp CTG_ERR_JRET(ctgEnqueue(pCtg, op)); return TSDB_CODE_SUCCESS; - + _return: taosMemoryFreeClear(msg); - + CTG_RET(code); } - - -int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp) { - int32_t code = 0; +int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp) { + int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_UPDATE_USER; op->syncOp = syncOp; - + SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg)); @@ -951,23 +886,23 @@ int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp op->data = msg; CTG_ERR_JRET(ctgEnqueue(pCtg, op)); - + return TSDB_CODE_SUCCESS; - + _return: tFreeSGetUserAuthRsp(pAuth); taosMemoryFreeClear(msg); - + CTG_RET(code); } -int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncOp) { - int32_t code = 0; +int32_t ctgUpdateTbIndexEnqueue(SCatalog *pCtg, STableIndex **pIndex, bool syncOp) { + int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_UPDATE_TB_INDEX; op->syncOp = syncOp; - + SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg)); @@ -983,22 +918,22 @@ int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncO *pIndex = NULL; return TSDB_CODE_SUCCESS; - + _return: taosArrayDestroyEx((*pIndex)->pIndex, tFreeSTableIndexInfo); taosMemoryFreeClear(*pIndex); taosMemoryFreeClear(msg); - + CTG_RET(code); } -int32_t ctgDropTbIndexEnqueue(SCatalog* pCtg, SName* pName, bool syncOp) { - int32_t code = 0; +int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) { + int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_DROP_TB_INDEX; op->syncOp = syncOp; - + SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg)); @@ -1012,25 +947,24 @@ int32_t ctgDropTbIndexEnqueue(SCatalog* pCtg, SName* pName, bool syncOp) { op->data = msg; CTG_ERR_JRET(ctgEnqueue(pCtg, op)); - + return TSDB_CODE_SUCCESS; - + _return: taosMemoryFreeClear(msg); - + CTG_RET(code); } - -int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool syncOp) { - int32_t code = 0; +int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool syncOp) { + int32_t code = 0; SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); op->opId = CTG_OP_CLEAR_CACHE; op->syncOp = syncOp; op->stopQueue = stopQueue; op->unLocked = true; - + SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgClearCacheMsg)); @@ -1042,24 +976,23 @@ int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool op->data = msg; CTG_ERR_JRET(ctgEnqueue(pCtg, op)); - + return TSDB_CODE_SUCCESS; - + _return: taosMemoryFreeClear(msg); - + CTG_RET(code); } - int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) { mgmt->slotRIdx = 0; mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND; mgmt->type = type; size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum; - + mgmt->slots = taosMemoryCalloc(1, msgSize); if (NULL == mgmt->slots) { qError("calloc %d failed", (int32_t)msgSize); @@ -1067,34 +1000,34 @@ int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) { } qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum); - + return TSDB_CODE_SUCCESS; } - int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) { int16_t widx = abs((int)(id % mgmt->slotNum)); SCtgRentSlot *slot = &mgmt->slots[widx]; - int32_t code = 0; - + int32_t code = 0; + CTG_LOCK(CTG_WRITE, &slot->lock); if (NULL == slot->meta) { slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size); if (NULL == slot->meta) { - qError("taosArrayInit %d failed, id:0x%"PRIx64", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, mgmt->type); + qError("taosArrayInit %d failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, + mgmt->type); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } } if (NULL == taosArrayPush(slot->meta, meta)) { - qError("taosArrayPush meta to rent failed, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + qError("taosArrayPush meta to rent failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } slot->needSort = true; - qDebug("add meta to rent, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); _return: @@ -1102,20 +1035,22 @@ _return: CTG_RET(code); } -int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare, __compar_fn_t searchCompare) { +int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare, + __compar_fn_t searchCompare) { int16_t widx = abs((int)(id % mgmt->slotNum)); SCtgRentSlot *slot = &mgmt->slots[widx]; - int32_t code = 0; + int32_t code = 0; CTG_LOCK(CTG_WRITE, &slot->lock); if (NULL == slot->meta) { - qDebug("empty meta slot, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + qDebug("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } if (slot->needSort) { - qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); + qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, + (int32_t)taosArrayGetSize(slot->meta)); taosArraySort(slot->meta, sortCompare); slot->needSort = false; qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); @@ -1123,20 +1058,22 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ); if (NULL == orig) { - qDebug("meta not found in slot, id:0x%"PRIx64", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); + qDebug("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, + (int32_t)taosArrayGetSize(slot->meta)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } memcpy(orig, meta, size); - qDebug("meta in rent updated, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + qDebug("meta in rent updated, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); _return: CTG_UNLOCK(CTG_WRITE, &slot->lock); if (code) { - qDebug("meta in rent update failed, will try to add it, code:%x, id:0x%"PRIx64", slot idx:%d, type:%d", code, id, widx, mgmt->type); + qDebug("meta in rent update failed, will try to add it, code:%x, id:0x%" PRIx64 ", slot idx:%d, type:%d", code, id, + widx, mgmt->type); CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size)); } @@ -1147,11 +1084,11 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp int16_t widx = abs((int)(id % mgmt->slotNum)); SCtgRentSlot *slot = &mgmt->slots[widx]; - int32_t code = 0; - + int32_t code = 0; + CTG_LOCK(CTG_WRITE, &slot->lock); if (NULL == slot->meta) { - qError("empty meta slot, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + qError("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -1163,13 +1100,13 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ); if (idx < 0) { - qError("meta not found in slot, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + qError("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } taosArrayRemove(slot->meta, idx); - qDebug("meta in rent removed, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type); _return: @@ -1178,7 +1115,6 @@ _return: CTG_RET(code); } - int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) { int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1); if (ridx >= mgmt->slotNum) { @@ -1187,8 +1123,8 @@ int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_ } SCtgRentSlot *slot = &mgmt->slots[ridx]; - int32_t code = 0; - + int32_t code = 0; + CTG_LOCK(CTG_READ, &slot->lock); if (NULL == slot->meta) { qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type); @@ -1254,13 +1190,15 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { SCtgDBCache newDBCache = {0}; newDBCache.dbId = dbId; - newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), + true, HASH_ENTRY_LOCK); if (NULL == newDBCache.tbCache) { ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - newDBCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); + newDBCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), + true, HASH_ENTRY_LOCK); if (NULL == newDBCache.stbCache) { ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); @@ -1272,21 +1210,21 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { ctgDebug("db already in cache, dbFName:%s", dbFName); goto _return; } - + ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } CTG_CACHE_STAT_INC(numOfDb, 1); - + SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1}; strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); - ctgDebug("db added to cache, dbFName:%s, dbId:0x%"PRIx64, dbFName, dbId); + ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId); CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion))); - ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%"PRIx64, dbFName, vgVersion.vgVersion, dbId); + ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, dbId); return TSDB_CODE_SUCCESS; @@ -1297,30 +1235,29 @@ _return: CTG_RET(code); } - -void ctgRemoveStbRent(SCatalog* pCtg, SCtgDBCache *dbCache) { +void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache) { if (NULL == dbCache->stbCache) { return; } - + void *pIter = taosHashIterate(dbCache->stbCache, NULL); while (pIter) { uint64_t *suid = NULL; suid = taosHashGetKey(pIter, NULL); - if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) { - ctgDebug("stb removed from rent, suid:0x%"PRIx64, *suid); + if (TSDB_CODE_SUCCESS == + ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) { + ctgDebug("stb removed from rent, suid:0x%" PRIx64, *suid); } - + pIter = taosHashIterate(dbCache->stbCache, pIter); } } - -int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) { +int32_t ctgRemoveDBFromCache(SCatalog *pCtg, SCtgDBCache *dbCache, const char *dbFName) { uint64_t dbId = dbCache->dbId; - - ctgInfo("start to remove db from cache, dbFName:%s, dbId:0x%"PRIx64, dbFName, dbCache->dbId); + + ctgInfo("start to remove db from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbCache->dbId); CTG_LOCK(CTG_WRITE, &dbCache->dbLock); @@ -1331,7 +1268,7 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d CTG_UNLOCK(CTG_WRITE, &dbCache->dbLock); CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare)); - ctgDebug("db removed from rent, dbFName:%s, dbId:0x%"PRIx64, dbFName, dbId); + ctgDebug("db removed from rent, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId); if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) { ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName); @@ -1339,19 +1276,18 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d } CTG_CACHE_STAT_DEC(numOfDb, 1); - ctgInfo("db removed from cache, dbFName:%s, dbId:0x%"PRIx64, dbFName, dbId); - + ctgInfo("db removed from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId); + return TSDB_CODE_SUCCESS; } - -int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) { - int32_t code = 0; +int32_t ctgGetAddDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) { + int32_t code = 0; SCtgDBCache *dbCache = NULL; ctgGetDBCache(pCtg, dbFName, &dbCache); - + if (dbCache) { - // TODO OPEN IT + // TODO OPEN IT #if 0 if (dbCache->dbId == dbId) { *pCache = dbCache; @@ -1368,7 +1304,7 @@ int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCt *pCache = dbCache; return TSDB_CODE_SUCCESS; } - + if (dbCache->dbId == dbId) { *pCache = dbCache; return TSDB_CODE_SUCCESS; @@ -1376,7 +1312,7 @@ int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCt #endif CTG_ERR_RET(ctgRemoveDBFromCache(pCtg, dbCache, dbFName)); } - + CTG_ERR_RET(ctgAddNewDBCache(pCtg, dbFName, dbId)); ctgGetDBCache(pCtg, dbFName, &dbCache); @@ -1386,7 +1322,8 @@ int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCt return TSDB_CODE_SUCCESS; } -int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char* dbFName, char* tbName, uint64_t dbId, uint64_t suid, SCtgTbCache* pCache) { +int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid, + SCtgTbCache *pCache) { SSTableVersion metaRent = {.dbId = dbId, .suid = suid}; if (pCache->pMeta) { metaRent.sversion = pCache->pMeta->sversion; @@ -1396,49 +1333,51 @@ int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char* dbFName, char* tbName, uin if (pCache->pIndex) { metaRent.smaVer = pCache->pIndex->version; } - + strcpy(metaRent.dbFName, dbFName); strcpy(metaRent.stbName, tbName); - - CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion), ctgStbVersionSortCompare, ctgStbVersionSearchCompare)); - ctgDebug("db %s,0x%" PRIx64 " stb %s,0x%" PRIx64 " sver %d tver %d smaVer %d updated to stbRent", - dbFName, dbId, tbName, suid, metaRent.sversion, metaRent.tversion, metaRent.smaVer); + CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion), + ctgStbVersionSortCompare, ctgStbVersionSearchCompare)); - return TSDB_CODE_SUCCESS; -} + ctgDebug("db %s,0x%" PRIx64 " stb %s,0x%" PRIx64 " sver %d tver %d smaVer %d updated to stbRent", dbFName, dbId, + tbName, suid, metaRent.sversion, metaRent.tversion, metaRent.smaVer); + return TSDB_CODE_SUCCESS; +} -int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName, STableMeta *meta, int32_t metaSize) { +int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName, + STableMeta *meta, int32_t metaSize) { if (NULL == dbCache->tbCache || NULL == dbCache->stbCache) { taosMemoryFree(meta); - ctgError("db is dropping, dbId:0x%"PRIx64, dbCache->dbId); + ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId); CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); } - bool isStb = meta->tableType == TSDB_SUPER_TABLE; - SCtgTbCache* pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName)); - STableMeta *orig = (pCache ? pCache->pMeta : NULL); - int8_t origType = 0; - uint64_t origSuid = 0; - + bool isStb = meta->tableType == TSDB_SUPER_TABLE; + SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName)); + STableMeta *orig = (pCache ? pCache->pMeta : NULL); + int8_t origType = 0; + uint64_t origSuid = 0; + if (orig) { origType = orig->tableType; - if (origType == meta->tableType && orig->uid == meta->uid && (origType == TSDB_CHILD_TABLE || (orig->sversion >= meta->sversion && orig->tversion >= meta->tversion))) { + if (origType == meta->tableType && orig->uid == meta->uid && + (origType == TSDB_CHILD_TABLE || (orig->sversion >= meta->sversion && orig->tversion >= meta->tversion))) { taosMemoryFree(meta); ctgDebug("ignore table %s meta update", tbName); return TSDB_CODE_SUCCESS; } - + if (origType == TSDB_SUPER_TABLE) { if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) { - ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%"PRIx64, dbFName, tbName, orig->suid); + ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid); } else { CTG_CACHE_STAT_DEC(numOfStb, 1); - ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%"PRIx64, dbFName, tbName, orig->suid); + ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid); } - + origSuid = orig->suid; } } @@ -1451,7 +1390,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam ctgError("taosHashPut new tbCache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - + pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName)); } else { taosMemoryFree(pCache->pMeta); @@ -1469,35 +1408,37 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam return TSDB_CODE_SUCCESS; } - if (origSuid != meta->suid && taosHashPut(dbCache->stbCache, &meta->suid, sizeof(meta->suid), tbName, strlen(tbName) + 1) != 0) { - ctgError("taosHashPut to stable cache failed, suid:0x%"PRIx64, meta->suid); + if (origSuid != meta->suid && + taosHashPut(dbCache->stbCache, &meta->suid, sizeof(meta->suid), tbName, strlen(tbName) + 1) != 0) { + ctgError("taosHashPut to stable cache failed, suid:0x%" PRIx64, meta->suid); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } CTG_CACHE_STAT_INC(numOfStb, 1); - ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName, meta->tableType); + ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName, + meta->tableType); CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache)); - + return TSDB_CODE_SUCCESS; } -int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFName, char *tbName, STableIndex **index) { +int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *tbName, STableIndex **index) { if (NULL == dbCache->tbCache) { ctgFreeSTableIndex(*index); taosMemoryFreeClear(*index); - ctgError("db is dropping, dbId:0x%"PRIx64, dbCache->dbId); + ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId); CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); } - STableIndex* pIndex = *index; - uint64_t suid = pIndex->suid; - SCtgTbCache* pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName)); + STableIndex *pIndex = *index; + uint64_t suid = pIndex->suid; + SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName)); if (NULL == pCache) { SCtgTbCache cache = {0}; cache.pIndex = pIndex; - + if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(cache)) != 0) { ctgFreeSTableIndex(*index); taosMemoryFreeClear(*index); @@ -1506,12 +1447,13 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFNa } *index = NULL; - ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version, (int32_t)taosArrayGetSize(pIndex->pIndex)); + ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version, + (int32_t)taosArrayGetSize(pIndex->pIndex)); if (suid) { CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, pIndex->suid, &cache)); } - + return TSDB_CODE_SUCCESS; } @@ -1526,24 +1468,25 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFNa pCache->pIndex = pIndex; *index = NULL; - ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version, (int32_t)taosArrayGetSize(pIndex->pIndex)); + ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version, + (int32_t)taosArrayGetSize(pIndex->pIndex)); if (suid) { CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, suid, pCache)); } - + return TSDB_CODE_SUCCESS; } -int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq) { - STableMetaOutput* pOutput = NULL; - int32_t code = 0; - +int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool syncReq) { + STableMetaOutput *pOutput = NULL; + int32_t code = 0; + CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput)); CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq)); return TSDB_CODE_SUCCESS; - + _return: ctgFreeSTableMetaOutput(pOutput); @@ -1551,11 +1494,11 @@ _return: } void ctgClearAllInstance(void) { - SCatalog* pCtg = NULL; + SCatalog *pCtg = NULL; - void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); + void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); while (pIter) { - pCtg = *(SCatalog**)pIter; + pCtg = *(SCatalog **)pIter; if (pCtg) { ctgClearHandle(pCtg); @@ -1566,11 +1509,11 @@ void ctgClearAllInstance(void) { } void ctgFreeAllInstance(void) { - SCatalog* pCtg = NULL; + SCatalog *pCtg = NULL; - void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); + void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL); while (pIter) { - pCtg = *(SCatalog**)pIter; + pCtg = *(SCatalog **)pIter; if (pCtg) { ctgFreeHandle(pCtg); @@ -1582,51 +1525,51 @@ void ctgFreeAllInstance(void) { taosHashClear(gCtgMgmt.pCluster); } - int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { - int32_t code = 0; + int32_t code = 0; SCtgUpdateVgMsg *msg = operation->data; - SDBVgInfo* dbInfo = msg->dbInfo; - char* dbFName = msg->dbFName; - SCatalog* pCtg = msg->pCtg; - + SDBVgInfo *dbInfo = msg->dbInfo; + char *dbFName = msg->dbFName; + SCatalog *pCtg = msg->pCtg; + if (NULL == dbInfo->vgHash) { goto _return; } - + if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) { - ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", - dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash)); + ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", dbFName, dbInfo->vgHash, + dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash)); CTG_ERR_JRET(TSDB_CODE_APP_ERROR); } - bool newAdded = false; + bool newAdded = false; SDbVgVersion vgVersion = {.dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable}; SCtgDBCache *dbCache = NULL; CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache)); if (NULL == dbCache) { - ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%"PRIx64, dbFName, msg->dbId); + ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, dbFName, msg->dbId); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } SCtgVgCache *vgCache = &dbCache->vgCache; CTG_ERR_JRET(ctgWLockVgInfo(msg->pCtg, dbCache)); - + if (vgCache->vgInfo) { SDBVgInfo *vgInfo = vgCache->vgInfo; - + if (dbInfo->vgVersion < vgInfo->vgVersion) { ctgDebug("db vgVer is old, dbFName:%s, vgVer:%d, curVer:%d", dbFName, dbInfo->vgVersion, vgInfo->vgVersion); ctgWUnlockVgInfo(dbCache); - + goto _return; } if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable) { - ctgDebug("no new db vgVer or numOfTable, dbFName:%s, vgVer:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, dbInfo->numOfTable); + ctgDebug("no new db vgVer or numOfTable, dbFName:%s, vgVer:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, + dbInfo->numOfTable); ctgWUnlockVgInfo(dbCache); - + goto _return; } @@ -1636,61 +1579,63 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { vgCache->vgInfo = dbInfo; msg->dbInfo = NULL; - ctgDebug("db vgInfo updated, dbFName:%s, vgVer:%d, dbId:0x%"PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId); + ctgDebug("db vgInfo updated, dbFName:%s, vgVer:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId); ctgWUnlockVgInfo(dbCache); dbCache = NULL; strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); - CTG_ERR_RET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare)); + CTG_ERR_RET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), + ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare)); _return: ctgFreeVgInfo(msg->dbInfo); taosMemoryFreeClear(msg); - + CTG_RET(code); } int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) { - int32_t code = 0; + int32_t code = 0; SCtgDropDBMsg *msg = operation->data; - SCatalog* pCtg = msg->pCtg; + SCatalog *pCtg = msg->pCtg; SCtgDBCache *dbCache = NULL; ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache); if (NULL == dbCache) { goto _return; } - + if (dbCache->dbId != msg->dbId) { - ctgInfo("dbId already updated, dbFName:%s, dbId:0x%"PRIx64 ", targetId:0x%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId); + ctgInfo("dbId already updated, dbFName:%s, dbId:0x%" PRIx64 ", targetId:0x%" PRIx64, msg->dbFName, dbCache->dbId, + msg->dbId); goto _return; } - + CTG_ERR_JRET(ctgRemoveDBFromCache(pCtg, dbCache, msg->dbFName)); _return: taosMemoryFreeClear(msg); - + CTG_RET(code); } int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) { - int32_t code = 0; + int32_t code = 0; SCtgDropDbVgroupMsg *msg = operation->data; - SCatalog* pCtg = msg->pCtg; + SCatalog *pCtg = msg->pCtg; SCtgDBCache *dbCache = NULL; ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache); if (NULL == dbCache) { goto _return; } - + CTG_ERR_RET(ctgWLockVgInfo(pCtg, dbCache)); - + ctgFreeVgInfo(dbCache->vgCache.vgInfo); dbCache->vgCache.vgInfo = NULL; @@ -1701,17 +1646,16 @@ int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) { _return: taosMemoryFreeClear(msg); - + CTG_RET(code); } - int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) { - int32_t code = 0; + int32_t code = 0; SCtgUpdateTbMetaMsg *msg = operation->data; - SCatalog* pCtg = msg->pCtg; - STableMetaOutput* pMeta = msg->pMeta; - SCtgDBCache *dbCache = NULL; + SCatalog *pCtg = msg->pCtg; + STableMetaOutput *pMeta = msg->pMeta; + SCtgDBCache *dbCache = NULL; if ((!CTG_IS_META_CTABLE(pMeta->metaType)) && NULL == pMeta->tbMeta) { ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", pMeta->dbFName, pMeta->tbName); @@ -1721,8 +1665,8 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) { if (CTG_IS_META_BOTH(pMeta->metaType) && TSDB_SUPER_TABLE != pMeta->tbMeta->tableType) { ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, pMeta->tbMeta->tableType); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - + } + CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pMeta->dbFName, pMeta->dbId, &dbCache)); if (NULL == dbCache) { ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, pMeta->dbFName, pMeta->dbId); @@ -1731,17 +1675,19 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) { if (CTG_IS_META_TABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) { int32_t metaSize = CTG_META_SIZE(pMeta->tbMeta); - CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize)); + CTG_ERR_JRET( + ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize)); pMeta->tbMeta = NULL; } if (CTG_IS_META_CTABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) { - SCTableMeta* ctbMeta = taosMemoryMalloc(sizeof(SCTableMeta)); + SCTableMeta *ctbMeta = taosMemoryMalloc(sizeof(SCTableMeta)); if (NULL == ctbMeta) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } memcpy(ctbMeta, &pMeta->ctbMeta, sizeof(SCTableMeta)); - CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->ctbName, (STableMeta *)ctbMeta, sizeof(SCTableMeta))); + CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->ctbName, + (STableMeta *)ctbMeta, sizeof(SCTableMeta))); } _return: @@ -1750,17 +1696,16 @@ _return: taosMemoryFreeClear(pMeta->tbMeta); taosMemoryFreeClear(pMeta); } - + taosMemoryFreeClear(msg); - + CTG_RET(code); } - int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) { - int32_t code = 0; + int32_t code = 0; SCtgDropStbMetaMsg *msg = operation->data; - SCatalog* pCtg = msg->pCtg; + SCatalog *pCtg = msg->pCtg; SCtgDBCache *dbCache = NULL; ctgGetDBCache(pCtg, msg->dbFName, &dbCache); @@ -1769,18 +1714,19 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) { } if (msg->dbId && (dbCache->dbId != msg->dbId)) { - ctgDebug("dbId already modified, dbFName:%s, current:0x%"PRIx64", dbId:0x%"PRIx64", stb:%s, suid:0x%"PRIx64, + ctgDebug("dbId already modified, dbFName:%s, current:0x%" PRIx64 ", dbId:0x%" PRIx64 ", stb:%s, suid:0x%" PRIx64, msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid); return TSDB_CODE_SUCCESS; } - + if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) { - ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid); + ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, + msg->stbName, msg->suid); } else { CTG_CACHE_STAT_DEC(numOfStb, 1); } - SCtgTbCache* pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName)); + SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName)); if (NULL == pTbCache) { ctgDebug("stb %s already not in cache", msg->stbName); goto _return; @@ -1790,29 +1736,29 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) { ctgFreeTbCacheImpl(pTbCache); CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock); - if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) { - ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid); + if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) { + ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid); } else { CTG_CACHE_STAT_DEC(numOfTbl, 1); } - - ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid); + + ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid); CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)); - - ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid); - + + ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid); + _return: taosMemoryFreeClear(msg); - + CTG_RET(code); } int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) { - int32_t code = 0; + int32_t code = 0; SCtgDropTblMetaMsg *msg = operation->data; - SCatalog* pCtg = msg->pCtg; + SCatalog *pCtg = msg->pCtg; SCtgDBCache *dbCache = NULL; ctgGetDBCache(pCtg, msg->dbFName, &dbCache); @@ -1821,11 +1767,12 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) { } if (dbCache->dbId != msg->dbId) { - ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%"PRIx64", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId, msg->dbFName, msg->tbName); + ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId, + msg->dbFName, msg->tbName); goto _return; } - SCtgTbCache* pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName)); + SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName)); if (NULL == pTbCache) { ctgDebug("tb %s already not in cache", msg->tbName); goto _return; @@ -1834,7 +1781,7 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) { CTG_LOCK(CTG_WRITE, &pTbCache->metaLock); ctgFreeTbCacheImpl(pTbCache); CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock); - + if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) { ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); @@ -1852,10 +1799,10 @@ _return: } int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) { - int32_t code = 0; + int32_t code = 0; SCtgUpdateUserMsg *msg = operation->data; - SCatalog* pCtg = msg->pCtg; - + SCatalog *pCtg = msg->pCtg; + SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user)); if (NULL == pUser) { SCtgUserAuth userAuth = {0}; @@ -1899,17 +1846,17 @@ _return: taosHashCleanup(msg->userAuth.createdDbs); taosHashCleanup(msg->userAuth.readDbs); taosHashCleanup(msg->userAuth.writeDbs); - + taosMemoryFreeClear(msg); - + CTG_RET(code); } int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) { - int32_t code = 0; + int32_t code = 0; SCtgUpdateEpsetMsg *msg = operation->data; - SCatalog* pCtg = msg->pCtg; - + SCatalog *pCtg = msg->pCtg; + SCtgDBCache *dbCache = NULL; CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache)); if (NULL == dbCache) { @@ -1919,23 +1866,23 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) { CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache)); - SDBVgInfo *vgInfo = dbCache->vgCache.vgInfo; + SDBVgInfo *vgInfo = dbCache->vgCache.vgInfo; if (NULL == vgInfo) { ctgDebug("vgroup in db %s not cached, ignore epset update", msg->dbFName); goto _return; } - - SVgroupInfo* pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId)); + + SVgroupInfo *pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId)); if (NULL == pInfo) { ctgDebug("no vgroup %d in db %s, ignore epset update", msg->vgId, msg->dbFName); goto _return; } - SEp* pOrigEp = &pInfo->epSet.eps[pInfo->epSet.inUse]; - SEp* pNewEp = &msg->epSet.eps[msg->epSet.inUse]; - ctgDebug("vgroup %d epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d, dbFName:%s in ctg", - pInfo->vgId, pInfo->epSet.inUse, pInfo->epSet.numOfEps, pOrigEp->fqdn, pOrigEp->port, - msg->epSet.inUse, msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName); + SEp *pOrigEp = &pInfo->epSet.eps[pInfo->epSet.inUse]; + SEp *pNewEp = &msg->epSet.eps[msg->epSet.inUse]; + ctgDebug("vgroup %d epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d, dbFName:%s in ctg", pInfo->vgId, + pInfo->epSet.inUse, pInfo->epSet.numOfEps, pOrigEp->fqdn, pOrigEp->port, msg->epSet.inUse, + msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName); pInfo->epSet = msg->epSet; @@ -1946,17 +1893,17 @@ _return: } taosMemoryFreeClear(msg); - + CTG_RET(code); } int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation) { - int32_t code = 0; + int32_t code = 0; SCtgUpdateTbIndexMsg *msg = operation->data; - SCatalog* pCtg = msg->pCtg; - STableIndex* pIndex = msg->pIndex; - SCtgDBCache *dbCache = NULL; - + SCatalog *pCtg = msg->pCtg; + STableIndex *pIndex = msg->pIndex; + SCtgDBCache *dbCache = NULL; + CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pIndex->dbFName, 0, &dbCache)); CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, pIndex->dbFName, pIndex->tbName, &pIndex)); @@ -1967,24 +1914,24 @@ _return: taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo); taosMemoryFreeClear(pIndex); } - + taosMemoryFreeClear(msg); - + CTG_RET(code); } int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation) { - int32_t code = 0; + int32_t code = 0; SCtgDropTbIndexMsg *msg = operation->data; - SCatalog* pCtg = msg->pCtg; - SCtgDBCache *dbCache = NULL; - + SCatalog *pCtg = msg->pCtg; + SCtgDBCache *dbCache = NULL; + CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache)); if (NULL == dbCache) { return TSDB_CODE_SUCCESS; } - STableIndex* pIndex = taosMemoryCalloc(1, sizeof(STableIndex)); + STableIndex *pIndex = taosMemoryCalloc(1, sizeof(STableIndex)); if (NULL == pIndex) { CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } @@ -2000,17 +1947,16 @@ _return: taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo); taosMemoryFreeClear(pIndex); } - + taosMemoryFreeClear(msg); - + CTG_RET(code); } - int32_t ctgOpClearCache(SCtgCacheOperation *operation) { - int32_t code = 0; + int32_t code = 0; SCtgClearCacheMsg *msg = operation->data; - SCatalog* pCtg = msg->pCtg; + SCatalog *pCtg = msg->pCtg; CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock); @@ -2020,7 +1966,7 @@ int32_t ctgOpClearCache(SCtgCacheOperation *operation) { } else { ctgClearHandle(pCtg); } - + goto _return; } @@ -2033,17 +1979,17 @@ int32_t ctgOpClearCache(SCtgCacheOperation *operation) { _return: CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock); - + taosMemoryFreeClear(msg); - + CTG_RET(code); } void ctgCleanupCacheQueue(void) { - SCtgQNode *node = NULL; - SCtgQNode *nodeNext = NULL; + SCtgQNode *node = NULL; + SCtgQNode *nodeNext = NULL; SCtgCacheOperation *op = NULL; - bool stopQueue = false; + bool stopQueue = false; while (true) { node = gCtgMgmt.queue.head->next; @@ -2055,12 +2001,13 @@ void ctgCleanupCacheQueue(void) { ctgDebug("process [%s] operation", gCtgCacheOperation[op->opId].name); (*gCtgCacheOperation[op->opId].func)(op); stopQueue = true; - CTG_RT_STAT_INC(numOfOpDequeue, 1); + taosMemoryFree(op->data); + CTG_RT_STAT_INC(numOfOpDequeue, 1); } else { taosMemoryFree(op->data); - CTG_RT_STAT_INC(numOfOpAbort, 1); + CTG_RT_STAT_INC(numOfOpAbort, 1); } - + if (op->syncOp) { tsem_post(&op->rspSem); } else { @@ -2070,7 +2017,7 @@ void ctgCleanupCacheQueue(void) { nodeNext = node->next; taosMemoryFree(node); - + node = nodeNext; } @@ -2085,7 +2032,7 @@ void ctgCleanupCacheQueue(void) { gCtgMgmt.queue.tail = NULL; } -void* ctgUpdateThreadFunc(void* param) { +void *ctgUpdateThreadFunc(void *param) { setThreadName("catalog"); qInfo("catalog update thread started"); @@ -2094,8 +2041,8 @@ void* ctgUpdateThreadFunc(void* param) { if (tsem_wait(&gCtgMgmt.queue.reqSem)) { qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); } - - if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { + + if (atomic_load_8((int8_t *)&gCtgMgmt.exit)) { ctgCleanupCacheQueue(); break; } @@ -2105,7 +2052,7 @@ void* ctgUpdateThreadFunc(void* param) { SCatalog *pCtg = ((SCtgUpdateMsgHeader *)operation->data)->pCtg; ctgDebug("process [%s] operation", gCtgCacheOperation[operation->opId].name); - + (*gCtgCacheOperation[operation->opId].func)(operation); if (operation->syncOp) { @@ -2114,18 +2061,17 @@ void* ctgUpdateThreadFunc(void* param) { taosMemoryFreeClear(operation); } - CTG_RT_STAT_INC(numOfOpDequeue, 1); + CTG_RT_STAT_INC(numOfOpDequeue, 1); ctgdShowCacheInfo(); ctgdShowClusterCache(pCtg); } qInfo("catalog update thread stopped"); - + return NULL; } - int32_t ctgStartUpdateThread() { TdThreadAttr thAttr; taosThreadAttrInit(&thAttr); @@ -2135,13 +2081,12 @@ int32_t ctgStartUpdateThread() { terrno = TAOS_SYSTEM_ERROR(errno); CTG_ERR_RET(terrno); } - + taosThreadAttrDestroy(&thAttr); return TSDB_CODE_SUCCESS; } - -int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) { +int32_t ctgGetTbMetaFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) { if (IS_SYS_DBNAME(ctx->pName->dbname)) { CTG_FLAG_SET_SYS_DB(ctx->flag); } @@ -2221,14 +2166,15 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe } #endif -int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, int32_t dbIdx, int32_t *fetchIdx, int32_t baseResIdx, SArray* pList) { - int32_t tbNum = taosArrayGetSize(pList); - SName* pName = taosArrayGet(pList, 0); - char dbFName[TSDB_DB_FNAME_LEN] = {0}; - int32_t flag = CTG_FLAG_UNKNOWN_STB; - uint64_t lastSuid = 0; - STableMeta* lastTableMeta = NULL; - +int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx *ctx, int32_t dbIdx, + int32_t *fetchIdx, int32_t baseResIdx, SArray *pList) { + int32_t tbNum = taosArrayGetSize(pList); + SName *pName = taosArrayGet(pList, 0); + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + int32_t flag = CTG_FLAG_UNKNOWN_STB; + uint64_t lastSuid = 0; + STableMeta *lastTableMeta = NULL; + if (IS_SYS_DBNAME(pName->dbname)) { CTG_FLAG_SET_SYS_DB(flag); strcpy(dbFName, pName->dbname); @@ -2237,9 +2183,9 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe } SCtgDBCache *dbCache = NULL; - SCtgTbCache* pCache = NULL; + SCtgTbCache *pCache = NULL; ctgAcquireDBCache(pCtg, dbFName, &dbCache); - + if (NULL == dbCache) { ctgDebug("db %s not in cache", dbFName); for (int32_t i = 0; i < tbNum; ++i) { @@ -2251,14 +2197,14 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe } for (int32_t i = 0; i < tbNum; ++i) { - SName* pName = taosArrayGet(pList, i); + SName *pName = taosArrayGet(pList, i); pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname)); if (NULL == pCache) { ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName); ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); - + continue; } @@ -2267,11 +2213,11 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName); ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); - + continue; } - STableMeta* tbMeta = pCache->pMeta; + STableMeta *tbMeta = pCache->pMeta; SCtgTbMetaCtx nctx = {0}; nctx.flag = flag; @@ -2280,8 +2226,8 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe nctx.tbInfo.suid = tbMeta->suid; nctx.tbInfo.tbType = tbMeta->tableType; - SMetaRes res = {0}; - STableMeta* pTableMeta = NULL; + SMetaRes res = {0}; + STableMeta *pTableMeta = NULL; if (tbMeta->tableType != TSDB_CHILD_TABLE) { int32_t metaSize = CTG_META_SIZE(tbMeta); pTableMeta = taosMemoryCalloc(1, metaSize); @@ -2289,20 +2235,20 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe ctgReleaseTbMetaToCache(pCtg, dbCache, pCache); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - + memcpy(pTableMeta, tbMeta, metaSize); - + CTG_UNLOCK(CTG_READ, &pCache->metaLock); - taosHashRelease(dbCache->tbCache, pCache); - + taosHashRelease(dbCache->tbCache, pCache); + ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName); - + res.pRes = pTableMeta; taosArrayPush(ctx->pResList, &res); continue; } - + // PROCESS FOR CHILD TABLE if (lastSuid && tbMeta->suid == lastSuid && lastTableMeta) { @@ -2310,32 +2256,32 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe memcpy(pTableMeta, tbMeta, sizeof(SCTableMeta)); CTG_UNLOCK(CTG_READ, &pCache->metaLock); - taosHashRelease(dbCache->tbCache, pCache); - + taosHashRelease(dbCache->tbCache, pCache); + ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName); - + res.pRes = pTableMeta; taosArrayPush(ctx->pResList, &res); - + continue; } - + int32_t metaSize = sizeof(SCTableMeta); pTableMeta = taosMemoryCalloc(1, metaSize); if (NULL == pTableMeta) { - ctgReleaseTbMetaToCache(pCtg, dbCache, pCache); + ctgReleaseTbMetaToCache(pCtg, dbCache, pCache); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - + memcpy(pTableMeta, tbMeta, metaSize); - + CTG_UNLOCK(CTG_READ, &pCache->metaLock); - taosHashRelease(dbCache->tbCache, pCache); - - ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", - pName->tname, nctx.tbInfo.tbType, dbFName); - - char* stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid)); + taosHashRelease(dbCache->tbCache, pCache); + + ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", pName->tname, + nctx.tbInfo.tbType, dbFName); + + char *stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid)); if (NULL == stName) { ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName); ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); @@ -2349,11 +2295,11 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe if (NULL == pCache) { ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", pTableMeta->suid, stName, dbFName); taosHashRelease(dbCache->stbCache, stName); - + ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); - taosMemoryFreeClear(pTableMeta); + taosMemoryFreeClear(pTableMeta); continue; } @@ -2363,8 +2309,8 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe if (NULL == pCache->pMeta) { ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", pTableMeta->suid, dbFName); CTG_UNLOCK(CTG_READ, &pCache->metaLock); - taosHashRelease(dbCache->tbCache, pCache); - + taosHashRelease(dbCache->tbCache, pCache); + ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); @@ -2372,14 +2318,15 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe continue; } - - STableMeta* stbMeta = pCache->pMeta; - if (stbMeta->suid != nctx.tbInfo.suid) { + + STableMeta *stbMeta = pCache->pMeta; + if (stbMeta->suid != nctx.tbInfo.suid) { CTG_UNLOCK(CTG_READ, &pCache->metaLock); - taosHashRelease(dbCache->tbCache, pCache); - - ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%"PRIx64 , stbMeta->suid, nctx.tbInfo.suid); - + taosHashRelease(dbCache->tbCache, pCache); + + ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, + nctx.tbInfo.suid); + ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag); taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1); @@ -2387,19 +2334,19 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe continue; } - + metaSize = CTG_META_SIZE(stbMeta); pTableMeta = taosMemoryRealloc(pTableMeta, metaSize); - if (NULL == pTableMeta) { + if (NULL == pTableMeta) { ctgReleaseTbMetaToCache(pCtg, dbCache, pCache); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - + memcpy(&pTableMeta->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta)); - + CTG_UNLOCK(CTG_READ, &pCache->metaLock); - taosHashRelease(dbCache->tbCache, pCache); - + taosHashRelease(dbCache->tbCache, pCache); + res.pRes = pTableMeta; taosArrayPush(ctx->pResList, &res); @@ -2408,14 +2355,13 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe } ctgReleaseDBCache(pCtg, dbCache); - + return TSDB_CODE_SUCCESS; } - -int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) { +int32_t ctgRemoveTbMetaFromCache(SCatalog *pCtg, SName *pTableName, bool syncReq) { int32_t code = 0; - STableMeta* tblMeta = NULL; + STableMeta *tblMeta = NULL; SCtgTbMetaCtx tbCtx = {0}; tbCtx.flag = CTG_FLAG_UNKNOWN_STB; tbCtx.pName = pTableName; @@ -2449,7 +2395,7 @@ int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVg CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - SCtgDBCache* dbCache = NULL; + SCtgDBCache *dbCache = NULL; int32_t code = 0; char dbFName[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(pTableName, dbFName); @@ -2476,5 +2422,3 @@ _return: CTG_RET(code); } - -