未验证 提交 abdd28f3 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #16791 from taosdata/fix/ctgMemleak

fix(ctg): fix memleak
......@@ -13,96 +13,46 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "trpc.h"
#include "query.h"
#include "tname.h"
#include "catalogInt.h"
#include "query.h"
#include "systable.h"
#include "tname.h"
#include "trpc.h"
SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {
{
CTG_OP_UPDATE_VGROUP,
"update vgInfo",
ctgOpUpdateVgroup
},
{
CTG_OP_UPDATE_TB_META,
"update tbMeta",
ctgOpUpdateTbMeta
},
{
CTG_OP_DROP_DB_CACHE,
"drop DB",
ctgOpDropDbCache
},
{
CTG_OP_DROP_DB_VGROUP,
"drop DBVgroup",
ctgOpDropDbVgroup
},
{
CTG_OP_DROP_STB_META,
"drop stbMeta",
ctgOpDropStbMeta
},
{
CTG_OP_DROP_TB_META,
"drop tbMeta",
ctgOpDropTbMeta
},
{
CTG_OP_UPDATE_USER,
"update user",
ctgOpUpdateUser
},
{
CTG_OP_UPDATE_VG_EPSET,
"update epset",
ctgOpUpdateEpset
},
{
CTG_OP_UPDATE_TB_INDEX,
"update tbIndex",
ctgOpUpdateTbIndex
},
{
CTG_OP_DROP_TB_INDEX,
"drop tbIndex",
ctgOpDropTbIndex
},
{
CTG_OP_CLEAR_CACHE,
"clear cache",
ctgOpClearCache
}
};
SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update vgInfo", ctgOpUpdateVgroup},
{CTG_OP_UPDATE_TB_META, "update tbMeta", ctgOpUpdateTbMeta},
{CTG_OP_DROP_DB_CACHE, "drop DB", ctgOpDropDbCache},
{CTG_OP_DROP_DB_VGROUP, "drop DBVgroup", ctgOpDropDbVgroup},
{CTG_OP_DROP_STB_META, "drop stbMeta", ctgOpDropStbMeta},
{CTG_OP_DROP_TB_META, "drop tbMeta", ctgOpDropTbMeta},
{CTG_OP_UPDATE_USER, "update user", ctgOpUpdateUser},
{CTG_OP_UPDATE_VG_EPSET, "update epset", ctgOpUpdateEpset},
{CTG_OP_UPDATE_TB_INDEX, "update tbIndex", ctgOpUpdateTbIndex},
{CTG_OP_DROP_TB_INDEX, "drop tbIndex", ctgOpDropTbIndex},
{CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}};
int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
CTG_LOCK(CTG_READ, &dbCache->vgCache.vgLock);
if (dbCache->deleted) {
CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
ctgDebug("db is dropping, dbId:0x%"PRIx64, dbCache->dbId);
ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
*inCache = false;
return TSDB_CODE_SUCCESS;
}
if (NULL == dbCache->vgCache.vgInfo) {
CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
*inCache = false;
ctgDebug("db vgInfo is empty, dbId:0x%"PRIx64, dbCache->dbId);
ctgDebug("db vgInfo is empty, dbId:0x%" PRIx64, dbCache->dbId);
return TSDB_CODE_SUCCESS;
}
*inCache = true;
return TSDB_CODE_SUCCESS;
}
......@@ -110,7 +60,7 @@ int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
CTG_LOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
if (dbCache->deleted) {
ctgDebug("db is dropping, dbId:0x%"PRIx64, dbCache->dbId);
ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
}
......@@ -118,19 +68,13 @@ int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
return TSDB_CODE_SUCCESS;
}
void ctgRUnlockVgInfo(SCtgDBCache *dbCache) {
CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
}
void ctgRUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); }
void ctgWUnlockVgInfo(SCtgDBCache *dbCache) {
CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
}
void ctgWUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock); }
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
CTG_UNLOCK(CTG_READ, &dbCache->dbLock);
}
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_READ, &dbCache->dbLock); }
int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
int32_t ctgAcquireDBCacheImpl(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
char *p = strchr(dbFName, '.');
if (p && IS_SYS_DBNAME(p + 1)) {
dbFName = p + 1;
......@@ -150,35 +94,35 @@ int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache *
if (dbCache->deleted) {
if (acquire) {
ctgReleaseDBCache(pCtg, dbCache);
}
}
*pCache = NULL;
ctgDebug("db is removing from cache, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
}
*pCache = dbCache;
return TSDB_CODE_SUCCESS;
}
int32_t ctgAcquireDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
int32_t ctgAcquireDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true));
}
int32_t ctgGetDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
int32_t ctgGetDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
}
void ctgReleaseVgInfoToCache(SCatalog* pCtg, SCtgDBCache *dbCache) {
void ctgReleaseVgInfoToCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
void ctgReleaseTbMetaToCache(SCatalog* pCtg, SCtgDBCache *dbCache, SCtgTbCache* pCache) {
void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
taosHashRelease(dbCache->tbCache, pCache);
}
if (dbCache) {
......@@ -186,10 +130,10 @@ void ctgReleaseTbMetaToCache(SCatalog* pCtg, SCtgDBCache *dbCache, SCtgTbCache*
}
}
void ctgReleaseTbIndexToCache(SCatalog* pCtg, SCtgDBCache *dbCache, SCtgTbCache* pCache) {
void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->indexLock);
taosHashRelease(dbCache->tbCache, pCache);
taosHashRelease(dbCache->tbCache, pCache);
}
if (dbCache) {
......@@ -197,10 +141,10 @@ void ctgReleaseTbIndexToCache(SCatalog* pCtg, SCtgDBCache *dbCache, SCtgTbCache*
}
}
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
SCtgDBCache *dbCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName);
goto _return;
}
......@@ -217,7 +161,7 @@ int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCac
CTG_CACHE_STAT_INC(numOfVgHit, 1);
ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
_return:
......@@ -229,19 +173,19 @@ _return:
*pCache = NULL;
CTG_CACHE_STAT_INC(numOfVgMiss, 1);
return TSDB_CODE_SUCCESS;
}
int32_t ctgAcquireTbMetaFromCache(SCatalog* pCtg, char *dbFName, char* tbName, SCtgDBCache **pDb, SCtgTbCache** pTb) {
int32_t ctgAcquireTbMetaFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
SCtgDBCache *dbCache = NULL;
SCtgTbCache* pCache = NULL;
SCtgTbCache *pCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName);
goto _return;
}
pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
if (NULL == pCache) {
ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
......@@ -258,7 +202,7 @@ int32_t ctgAcquireTbMetaFromCache(SCatalog* pCtg, char *dbFName, char* tbName, S
*pTb = pCache;
ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);
CTG_CACHE_STAT_INC(numOfMetaHit, 1);
return TSDB_CODE_SUCCESS;
......@@ -268,20 +212,20 @@ _return:
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
return TSDB_CODE_SUCCESS;
}
int32_t ctgAcquireStbMetaFromCache(SCatalog* pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache** pTb) {
SCtgDBCache* dbCache = NULL;
SCtgTbCache* pCache = NULL;
int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) {
SCtgDBCache *dbCache = NULL;
SCtgTbCache *pCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName);
goto _return;
}
char* stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
char *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
if (NULL == stName) {
ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
goto _return;
......@@ -304,7 +248,7 @@ int32_t ctgAcquireStbMetaFromCache(SCatalog* pCtg, char *dbFName, uint64_t suid,
*pTb = pCache;
ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);
CTG_CACHE_STAT_INC(numOfMetaHit, 1);
return TSDB_CODE_SUCCESS;
......@@ -317,20 +261,19 @@ _return:
*pDb = NULL;
*pTb = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t ctgAcquireTbIndexFromCache(SCatalog* pCtg, char *dbFName, char* tbName, SCtgDBCache **pDb, SCtgTbCache** pTb) {
int32_t ctgAcquireTbIndexFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
SCtgDBCache *dbCache = NULL;
SCtgTbCache* pCache = NULL;
SCtgTbCache *pCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName);
goto _return;
}
int32_t sz = 0;
pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
if (NULL == pCache) {
......@@ -348,7 +291,7 @@ int32_t ctgAcquireTbIndexFromCache(SCatalog* pCtg, char *dbFName, char* tbName,
*pTb = pCache;
ctgDebug("tb %s index got in cache, dbFName:%s", tbName, dbFName);
CTG_CACHE_STAT_INC(numOfIndexHit, 1);
return TSDB_CODE_SUCCESS;
......@@ -358,32 +301,31 @@ _return:
ctgReleaseTbIndexToCache(pCtg, dbCache, pCache);
CTG_CACHE_STAT_INC(numOfIndexMiss, 1);
return TSDB_CODE_SUCCESS;
}
int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *exist) {
SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL;
ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache);
if (NULL == tbCache) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
*exist = 0;
return TSDB_CODE_SUCCESS;
}
*exist = 1;
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
return TSDB_CODE_SUCCESS;
}
int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) {
int32_t code = 0;
int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
int32_t code = 0;
SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL;
SCtgTbCache *tbCache = NULL;
*pTableMeta = NULL;
char dbFName[TSDB_DB_FNAME_LEN] = {0};
......@@ -399,12 +341,12 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
return TSDB_CODE_SUCCESS;
}
STableMeta* tbMeta = tbCache->pMeta;
STableMeta *tbMeta = tbCache->pMeta;
ctx->tbInfo.inCache = true;
ctx->tbInfo.dbId = dbCache->dbId;
ctx->tbInfo.suid = tbMeta->suid;
ctx->tbInfo.tbType = tbMeta->tableType;
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
int32_t metaSize = CTG_META_SIZE(tbMeta);
*pTableMeta = taosMemoryCalloc(1, metaSize);
......@@ -414,14 +356,14 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
}
memcpy(*pTableMeta, tbMeta, metaSize);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName);
return TSDB_CODE_SUCCESS;
}
// PROCESS FOR CHILD TABLE
int32_t metaSize = sizeof(SCTableMeta);
*pTableMeta = taosMemoryCalloc(1, metaSize);
if (NULL == *pTableMeta) {
......@@ -429,10 +371,10 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
}
memcpy(*pTableMeta, tbMeta, metaSize);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s",
ctx->pName->tname, ctx->tbInfo.tbType, dbFName);
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
ctx->tbInfo.tbType, dbFName);
ctgAcquireStbMetaFromCache(pCtg, dbFName, ctx->tbInfo.suid, &dbCache, &tbCache);
if (NULL == tbCache) {
......@@ -441,17 +383,17 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid);
return TSDB_CODE_SUCCESS;
}
STableMeta* stbMeta = tbCache->pMeta;
if (stbMeta->suid != ctx->tbInfo.suid) {
STableMeta *stbMeta = tbCache->pMeta;
if (stbMeta->suid != ctx->tbInfo.suid) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%"PRIx64 , stbMeta->suid, ctx->tbInfo.suid);
ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
metaSize = CTG_META_SIZE(stbMeta);
*pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize);
if (NULL == *pTableMeta) {
if (NULL == *pTableMeta) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
......@@ -461,24 +403,24 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("Got tb %s meta from cache, dbFName:%s", ctx->pName->tname, dbFName);
return TSDB_CODE_SUCCESS;
_return:
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
taosMemoryFreeClear(*pTableMeta);
CTG_RET(code);
}
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid,
char *stbName) {
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType,
uint64_t *suid, char *stbName) {
*sver = -1;
*tver = -1;
SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL;
SCtgTbCache *tbCache = NULL;
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, dbFName);
......@@ -488,7 +430,7 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver,
return TSDB_CODE_SUCCESS;
}
STableMeta* tbMeta = tbCache->pMeta;
STableMeta *tbMeta = tbCache->pMeta;
*tbType = tbMeta->tableType;
*suid = tbMeta->suid;
......@@ -496,29 +438,29 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver,
*sver = tbMeta->sversion;
*tver = tbMeta->tversion;
ctgDebug("Got tb %s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:0x%" PRIx64,
pTableName->tname, dbFName, *tbType, *sver, *tver, *suid);
ctgDebug("Got tb %s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:0x%" PRIx64, pTableName->tname,
dbFName, *tbType, *sver, *tver, *suid);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
return TSDB_CODE_SUCCESS;
}
// PROCESS FOR CHILD TABLE
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("Got ctb %s ver from cache, will continue to get its stb ver, dbFName:%s", pTableName->tname, dbFName);
ctgAcquireStbMetaFromCache(pCtg, dbFName, *suid, &dbCache, &tbCache);
if (NULL == tbCache) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("stb 0x%" PRIx64 " meta not in cache", *suid);
return TSDB_CODE_SUCCESS;
}
STableMeta* stbMeta = tbCache->pMeta;
STableMeta *stbMeta = tbCache->pMeta;
if (stbMeta->suid != *suid) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid:0x%" PRIx64 , stbMeta->suid, *suid);
ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid:0x%" PRIx64, stbMeta->suid, *suid);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
......@@ -533,15 +475,15 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver,
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("Got tb %s sver %d tver %d from cache, type:%d, dbFName:%s", pTableName->tname, *sver, *tver, *tbType, dbFName);
ctgDebug("Got tb %s sver %d tver %d from cache, type:%d, dbFName:%s", pTableName->tname, *sver, *tver, *tbType,
dbFName);
return TSDB_CODE_SUCCESS;
}
int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tbName, int32_t *tbType) {
int32_t ctgReadTbTypeFromCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *tbType) {
SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL;
SCtgTbCache *tbCache = NULL;
CTG_ERR_RET(ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache));
if (NULL == tbCache) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
......@@ -551,15 +493,15 @@ int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tbName, int3
*tbType = tbCache->pMeta->tableType;
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("Got tb %s tbType %d from cache, dbFName:%s", tbName, *tbType, dbFName);
ctgDebug("Got tb %s tbType %d from cache, dbFName:%s", tbName, *tbType, dbFName);
return TSDB_CODE_SUCCESS;
}
int32_t ctgReadTbIndexFromCache(SCatalog* pCtg, SName* pTableName, SArray** pRes) {
int32_t code = 0;
int32_t ctgReadTbIndexFromCache(SCatalog *pCtg, SName *pTableName, SArray **pRes) {
int32_t code = 0;
SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL;
SCtgTbCache *tbCache = NULL;
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, dbFName);
......@@ -580,14 +522,14 @@ _return:
CTG_RET(code);
}
int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass) {
int32_t ctgChkAuthFromCache(SCatalog *pCtg, char *user, char *dbFName, AUTH_TYPE type, bool *inCache, bool *pass) {
char *p = strchr(dbFName, '.');
if (p) {
++p;
} else {
p = dbFName;
}
if (IS_SYS_DBNAME(p)) {
*inCache = true;
*pass = true;
......@@ -605,7 +547,7 @@ int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE
ctgDebug("Got user from cache, user:%s", user);
CTG_CACHE_STAT_INC(numOfUserHit, 1);
if (pUser->superUser) {
*pass = true;
return TSDB_CODE_SUCCESS;
......@@ -617,54 +559,53 @@ int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE
CTG_UNLOCK(CTG_READ, &pUser->lock);
return TSDB_CODE_SUCCESS;
}
if (pUser->readDbs && taosHashGet(pUser->readDbs, dbFName, strlen(dbFName)) && type == AUTH_TYPE_READ) {
*pass = true;
}
if (pUser->writeDbs && taosHashGet(pUser->writeDbs, dbFName, strlen(dbFName)) && type == AUTH_TYPE_WRITE) {
*pass = true;
}
CTG_UNLOCK(CTG_READ, &pUser->lock);
return TSDB_CODE_SUCCESS;
_return:
*inCache = false;
CTG_CACHE_STAT_INC(numOfUserMiss, 1);
return TSDB_CODE_SUCCESS;
}
void ctgDequeue(SCtgCacheOperation **op) {
SCtgQNode *orig = gCtgMgmt.queue.head;
SCtgQNode *node = gCtgMgmt.queue.head->next;
gCtgMgmt.queue.head = gCtgMgmt.queue.head->next;
CTG_QUEUE_DEC();
taosMemoryFreeClear(orig);
*op = node->op;
}
int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
if (NULL == node) {
qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
}
bool syncOp = operation->syncOp;
char* opName = gCtgCacheOperation[operation->opId].name;
bool syncOp = operation->syncOp;
char *opName = gCtgCacheOperation[operation->opId].name;
if (operation->syncOp) {
tsem_init(&operation->rspSem, 0, 0);
}
node->op = operation;
CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
......@@ -699,12 +640,11 @@ int32_t ctgEnqueue(SCatalog* pCtg, SCtgCacheOperation *operation) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) {
int32_t code = 0;
int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_DB_CACHE;
SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
......@@ -732,12 +672,12 @@ _return:
CTG_RET(code);
}
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncOp) {
int32_t code = 0;
int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_DB_VGROUP;
op->syncOp = syncOp;
SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
......@@ -764,14 +704,13 @@ _return:
CTG_RET(code);
}
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncOp) {
int32_t code = 0;
int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid,
bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_STB_META;
op->syncOp = syncOp;
SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
......@@ -796,14 +735,12 @@ _return:
CTG_RET(code);
}
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
int32_t code = 0;
int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_TB_META;
op->syncOp = syncOp;
SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
......@@ -827,12 +764,12 @@ _return:
CTG_RET(code);
}
int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncOp) {
int32_t code = 0;
int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, SDBVgInfo *dbInfo, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_VGROUP;
op->syncOp = syncOp;
SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
......@@ -864,12 +801,12 @@ _return:
CTG_RET(code);
}
int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncOp) {
int32_t code = 0;
int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_TB_META;
op->syncOp = syncOp;
SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
......@@ -889,7 +826,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
_return:
if (output) {
......@@ -898,15 +835,15 @@ _return:
}
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet) {
int32_t code = 0;
int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEpSet *pEpSet) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_VG_EPSET;
SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
......@@ -923,22 +860,20 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEp
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp) {
int32_t code = 0;
int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_USER;
op->syncOp = syncOp;
SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
......@@ -951,23 +886,23 @@ int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncOp
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
_return:
tFreeSGetUserAuthRsp(pAuth);
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncOp) {
int32_t code = 0;
int32_t ctgUpdateTbIndexEnqueue(SCatalog *pCtg, STableIndex **pIndex, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_TB_INDEX;
op->syncOp = syncOp;
SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg));
......@@ -983,22 +918,22 @@ int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncO
*pIndex = NULL;
return TSDB_CODE_SUCCESS;
_return:
taosArrayDestroyEx((*pIndex)->pIndex, tFreeSTableIndexInfo);
taosMemoryFreeClear(*pIndex);
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgDropTbIndexEnqueue(SCatalog* pCtg, SName* pName, bool syncOp) {
int32_t code = 0;
int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_TB_INDEX;
op->syncOp = syncOp;
SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg));
......@@ -1012,25 +947,24 @@ int32_t ctgDropTbIndexEnqueue(SCatalog* pCtg, SName* pName, bool syncOp) {
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool syncOp) {
int32_t code = 0;
int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_CLEAR_CACHE;
op->syncOp = syncOp;
op->stopQueue = stopQueue;
op->unLocked = true;
SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgClearCacheMsg));
......@@ -1042,24 +976,23 @@ int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
mgmt->slotRIdx = 0;
mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
mgmt->type = type;
size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
mgmt->slots = taosMemoryCalloc(1, msgSize);
if (NULL == mgmt->slots) {
qError("calloc %d failed", (int32_t)msgSize);
......@@ -1067,34 +1000,34 @@ int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
}
qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
return TSDB_CODE_SUCCESS;
}
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
int16_t widx = abs((int)(id % mgmt->slotNum));
SCtgRentSlot *slot = &mgmt->slots[widx];
int32_t code = 0;
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) {
slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
if (NULL == slot->meta) {
qError("taosArrayInit %d failed, id:0x%"PRIx64", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, mgmt->type);
qError("taosArrayInit %d failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx,
mgmt->type);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
}
if (NULL == taosArrayPush(slot->meta, meta)) {
qError("taosArrayPush meta to rent failed, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
qError("taosArrayPush meta to rent failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
slot->needSort = true;
qDebug("add meta to rent, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
_return:
......@@ -1102,20 +1035,22 @@ _return:
CTG_RET(code);
}
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare, __compar_fn_t searchCompare) {
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare,
__compar_fn_t searchCompare) {
int16_t widx = abs((int)(id % mgmt->slotNum));
SCtgRentSlot *slot = &mgmt->slots[widx];
int32_t code = 0;
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) {
qDebug("empty meta slot, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
qDebug("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (slot->needSort) {
qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type,
(int32_t)taosArrayGetSize(slot->meta));
taosArraySort(slot->meta, sortCompare);
slot->needSort = false;
qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
......@@ -1123,20 +1058,22 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si
void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ);
if (NULL == orig) {
qDebug("meta not found in slot, id:0x%"PRIx64", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
qDebug("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type,
(int32_t)taosArrayGetSize(slot->meta));
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
memcpy(orig, meta, size);
qDebug("meta in rent updated, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
qDebug("meta in rent updated, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
_return:
CTG_UNLOCK(CTG_WRITE, &slot->lock);
if (code) {
qDebug("meta in rent update failed, will try to add it, code:%x, id:0x%"PRIx64", slot idx:%d, type:%d", code, id, widx, mgmt->type);
qDebug("meta in rent update failed, will try to add it, code:%x, id:0x%" PRIx64 ", slot idx:%d, type:%d", code, id,
widx, mgmt->type);
CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
}
......@@ -1147,11 +1084,11 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp
int16_t widx = abs((int)(id % mgmt->slotNum));
SCtgRentSlot *slot = &mgmt->slots[widx];
int32_t code = 0;
int32_t code = 0;
CTG_LOCK(CTG_WRITE, &slot->lock);
if (NULL == slot->meta) {
qError("empty meta slot, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
qError("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
......@@ -1163,13 +1100,13 @@ int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortComp
int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ);
if (idx < 0) {
qError("meta not found in slot, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
qError("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
taosArrayRemove(slot->meta, idx);
qDebug("meta in rent removed, id:0x%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
_return:
......@@ -1178,7 +1115,6 @@ _return:
CTG_RET(code);
}
int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
if (ridx >= mgmt->slotNum) {
......@@ -1187,8 +1123,8 @@ int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_
}
SCtgRentSlot *slot = &mgmt->slots[ridx];
int32_t code = 0;
int32_t code = 0;
CTG_LOCK(CTG_READ, &slot->lock);
if (NULL == slot->meta) {
qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
......@@ -1254,13 +1190,15 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
SCtgDBCache newDBCache = {0};
newDBCache.dbId = dbId;
newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
true, HASH_ENTRY_LOCK);
if (NULL == newDBCache.tbCache) {
ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
newDBCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
newDBCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT),
true, HASH_ENTRY_LOCK);
if (NULL == newDBCache.stbCache) {
ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
......@@ -1272,21 +1210,21 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
ctgDebug("db already in cache, dbFName:%s", dbFName);
goto _return;
}
ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_CACHE_STAT_INC(numOfDb, 1);
SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1};
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
ctgDebug("db added to cache, dbFName:%s, dbId:0x%"PRIx64, dbFName, dbId);
ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));
ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%"PRIx64, dbFName, vgVersion.vgVersion, dbId);
ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, dbId);
return TSDB_CODE_SUCCESS;
......@@ -1297,30 +1235,29 @@ _return:
CTG_RET(code);
}
void ctgRemoveStbRent(SCatalog* pCtg, SCtgDBCache *dbCache) {
void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache) {
if (NULL == dbCache->stbCache) {
return;
}
void *pIter = taosHashIterate(dbCache->stbCache, NULL);
while (pIter) {
uint64_t *suid = NULL;
suid = taosHashGetKey(pIter, NULL);
if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
ctgDebug("stb removed from rent, suid:0x%"PRIx64, *suid);
if (TSDB_CODE_SUCCESS ==
ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
ctgDebug("stb removed from rent, suid:0x%" PRIx64, *suid);
}
pIter = taosHashIterate(dbCache->stbCache, pIter);
}
}
int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) {
int32_t ctgRemoveDBFromCache(SCatalog *pCtg, SCtgDBCache *dbCache, const char *dbFName) {
uint64_t dbId = dbCache->dbId;
ctgInfo("start to remove db from cache, dbFName:%s, dbId:0x%"PRIx64, dbFName, dbCache->dbId);
ctgInfo("start to remove db from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbCache->dbId);
CTG_LOCK(CTG_WRITE, &dbCache->dbLock);
......@@ -1331,7 +1268,7 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d
CTG_UNLOCK(CTG_WRITE, &dbCache->dbLock);
CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
ctgDebug("db removed from rent, dbFName:%s, dbId:0x%"PRIx64, dbFName, dbId);
ctgDebug("db removed from rent, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) {
ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName);
......@@ -1339,19 +1276,18 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d
}
CTG_CACHE_STAT_DEC(numOfDb, 1);
ctgInfo("db removed from cache, dbFName:%s, dbId:0x%"PRIx64, dbFName, dbId);
ctgInfo("db removed from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
int32_t code = 0;
int32_t ctgGetAddDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
int32_t code = 0;
SCtgDBCache *dbCache = NULL;
ctgGetDBCache(pCtg, dbFName, &dbCache);
if (dbCache) {
// TODO OPEN IT
// TODO OPEN IT
#if 0
if (dbCache->dbId == dbId) {
*pCache = dbCache;
......@@ -1368,7 +1304,7 @@ int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCt
*pCache = dbCache;
return TSDB_CODE_SUCCESS;
}
if (dbCache->dbId == dbId) {
*pCache = dbCache;
return TSDB_CODE_SUCCESS;
......@@ -1376,7 +1312,7 @@ int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCt
#endif
CTG_ERR_RET(ctgRemoveDBFromCache(pCtg, dbCache, dbFName));
}
CTG_ERR_RET(ctgAddNewDBCache(pCtg, dbFName, dbId));
ctgGetDBCache(pCtg, dbFName, &dbCache);
......@@ -1386,7 +1322,8 @@ int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCt
return TSDB_CODE_SUCCESS;
}
int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char* dbFName, char* tbName, uint64_t dbId, uint64_t suid, SCtgTbCache* pCache) {
int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid,
SCtgTbCache *pCache) {
SSTableVersion metaRent = {.dbId = dbId, .suid = suid};
if (pCache->pMeta) {
metaRent.sversion = pCache->pMeta->sversion;
......@@ -1396,49 +1333,51 @@ int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char* dbFName, char* tbName, uin
if (pCache->pIndex) {
metaRent.smaVer = pCache->pIndex->version;
}
strcpy(metaRent.dbFName, dbFName);
strcpy(metaRent.stbName, tbName);
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion), ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
ctgDebug("db %s,0x%" PRIx64 " stb %s,0x%" PRIx64 " sver %d tver %d smaVer %d updated to stbRent",
dbFName, dbId, tbName, suid, metaRent.sversion, metaRent.tversion, metaRent.smaVer);
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion),
ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
return TSDB_CODE_SUCCESS;
}
ctgDebug("db %s,0x%" PRIx64 " stb %s,0x%" PRIx64 " sver %d tver %d smaVer %d updated to stbRent", dbFName, dbId,
tbName, suid, metaRent.sversion, metaRent.tversion, metaRent.smaVer);
return TSDB_CODE_SUCCESS;
}
int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName, STableMeta *meta, int32_t metaSize) {
int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName,
STableMeta *meta, int32_t metaSize) {
if (NULL == dbCache->tbCache || NULL == dbCache->stbCache) {
taosMemoryFree(meta);
ctgError("db is dropping, dbId:0x%"PRIx64, dbCache->dbId);
ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
}
bool isStb = meta->tableType == TSDB_SUPER_TABLE;
SCtgTbCache* pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
STableMeta *orig = (pCache ? pCache->pMeta : NULL);
int8_t origType = 0;
uint64_t origSuid = 0;
bool isStb = meta->tableType == TSDB_SUPER_TABLE;
SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
STableMeta *orig = (pCache ? pCache->pMeta : NULL);
int8_t origType = 0;
uint64_t origSuid = 0;
if (orig) {
origType = orig->tableType;
if (origType == meta->tableType && orig->uid == meta->uid && (origType == TSDB_CHILD_TABLE || (orig->sversion >= meta->sversion && orig->tversion >= meta->tversion))) {
if (origType == meta->tableType && orig->uid == meta->uid &&
(origType == TSDB_CHILD_TABLE || (orig->sversion >= meta->sversion && orig->tversion >= meta->tversion))) {
taosMemoryFree(meta);
ctgDebug("ignore table %s meta update", tbName);
return TSDB_CODE_SUCCESS;
}
if (origType == TSDB_SUPER_TABLE) {
if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%"PRIx64, dbFName, tbName, orig->suid);
ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
} else {
CTG_CACHE_STAT_DEC(numOfStb, 1);
ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%"PRIx64, dbFName, tbName, orig->suid);
ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
}
origSuid = orig->suid;
}
}
......@@ -1451,7 +1390,7 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
ctgError("taosHashPut new tbCache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
} else {
taosMemoryFree(pCache->pMeta);
......@@ -1469,35 +1408,37 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
return TSDB_CODE_SUCCESS;
}
if (origSuid != meta->suid && taosHashPut(dbCache->stbCache, &meta->suid, sizeof(meta->suid), tbName, strlen(tbName) + 1) != 0) {
ctgError("taosHashPut to stable cache failed, suid:0x%"PRIx64, meta->suid);
if (origSuid != meta->suid &&
taosHashPut(dbCache->stbCache, &meta->suid, sizeof(meta->suid), tbName, strlen(tbName) + 1) != 0) {
ctgError("taosHashPut to stable cache failed, suid:0x%" PRIx64, meta->suid);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_CACHE_STAT_INC(numOfStb, 1);
ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName, meta->tableType);
ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName,
meta->tableType);
CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache));
return TSDB_CODE_SUCCESS;
}
int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFName, char *tbName, STableIndex **index) {
int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *tbName, STableIndex **index) {
if (NULL == dbCache->tbCache) {
ctgFreeSTableIndex(*index);
taosMemoryFreeClear(*index);
ctgError("db is dropping, dbId:0x%"PRIx64, dbCache->dbId);
ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
}
STableIndex* pIndex = *index;
uint64_t suid = pIndex->suid;
SCtgTbCache* pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
STableIndex *pIndex = *index;
uint64_t suid = pIndex->suid;
SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
if (NULL == pCache) {
SCtgTbCache cache = {0};
cache.pIndex = pIndex;
if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(cache)) != 0) {
ctgFreeSTableIndex(*index);
taosMemoryFreeClear(*index);
......@@ -1506,12 +1447,13 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFNa
}
*index = NULL;
ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version, (int32_t)taosArrayGetSize(pIndex->pIndex));
ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
(int32_t)taosArrayGetSize(pIndex->pIndex));
if (suid) {
CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, pIndex->suid, &cache));
}
return TSDB_CODE_SUCCESS;
}
......@@ -1526,24 +1468,25 @@ int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char* dbFNa
pCache->pIndex = pIndex;
*index = NULL;
ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version, (int32_t)taosArrayGetSize(pIndex->pIndex));
ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
(int32_t)taosArrayGetSize(pIndex->pIndex));
if (suid) {
CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, suid, pCache));
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq) {
STableMetaOutput* pOutput = NULL;
int32_t code = 0;
int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool syncReq) {
STableMetaOutput *pOutput = NULL;
int32_t code = 0;
CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput));
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq));
return TSDB_CODE_SUCCESS;
_return:
ctgFreeSTableMetaOutput(pOutput);
......@@ -1551,11 +1494,11 @@ _return:
}
void ctgClearAllInstance(void) {
SCatalog* pCtg = NULL;
SCatalog *pCtg = NULL;
void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
while (pIter) {
pCtg = *(SCatalog**)pIter;
pCtg = *(SCatalog **)pIter;
if (pCtg) {
ctgClearHandle(pCtg);
......@@ -1566,11 +1509,11 @@ void ctgClearAllInstance(void) {
}
void ctgFreeAllInstance(void) {
SCatalog* pCtg = NULL;
SCatalog *pCtg = NULL;
void* pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
while (pIter) {
pCtg = *(SCatalog**)pIter;
pCtg = *(SCatalog **)pIter;
if (pCtg) {
ctgFreeHandle(pCtg);
......@@ -1582,51 +1525,51 @@ void ctgFreeAllInstance(void) {
taosHashClear(gCtgMgmt.pCluster);
}
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
int32_t code = 0;
int32_t code = 0;
SCtgUpdateVgMsg *msg = operation->data;
SDBVgInfo* dbInfo = msg->dbInfo;
char* dbFName = msg->dbFName;
SCatalog* pCtg = msg->pCtg;
SDBVgInfo *dbInfo = msg->dbInfo;
char *dbFName = msg->dbFName;
SCatalog *pCtg = msg->pCtg;
if (NULL == dbInfo->vgHash) {
goto _return;
}
if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d",
dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", dbFName, dbInfo->vgHash,
dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
}
bool newAdded = false;
bool newAdded = false;
SDbVgVersion vgVersion = {.dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable};
SCtgDBCache *dbCache = NULL;
CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache));
if (NULL == dbCache) {
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%"PRIx64, dbFName, msg->dbId);
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, dbFName, msg->dbId);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
SCtgVgCache *vgCache = &dbCache->vgCache;
CTG_ERR_JRET(ctgWLockVgInfo(msg->pCtg, dbCache));
if (vgCache->vgInfo) {
SDBVgInfo *vgInfo = vgCache->vgInfo;
if (dbInfo->vgVersion < vgInfo->vgVersion) {
ctgDebug("db vgVer is old, dbFName:%s, vgVer:%d, curVer:%d", dbFName, dbInfo->vgVersion, vgInfo->vgVersion);
ctgWUnlockVgInfo(dbCache);
goto _return;
}
if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable) {
ctgDebug("no new db vgVer or numOfTable, dbFName:%s, vgVer:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, dbInfo->numOfTable);
ctgDebug("no new db vgVer or numOfTable, dbFName:%s, vgVer:%d, numOfTable:%d", dbFName, dbInfo->vgVersion,
dbInfo->numOfTable);
ctgWUnlockVgInfo(dbCache);
goto _return;
}
......@@ -1636,61 +1579,63 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
vgCache->vgInfo = dbInfo;
msg->dbInfo = NULL;
ctgDebug("db vgInfo updated, dbFName:%s, vgVer:%d, dbId:0x%"PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId);
ctgDebug("db vgInfo updated, dbFName:%s, vgVer:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId);
ctgWUnlockVgInfo(dbCache);
dbCache = NULL;
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
CTG_ERR_RET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion),
ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
_return:
ctgFreeVgInfo(msg->dbInfo);
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
int32_t code = 0;
int32_t code = 0;
SCtgDropDBMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCatalog *pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
if (NULL == dbCache) {
goto _return;
}
if (dbCache->dbId != msg->dbId) {
ctgInfo("dbId already updated, dbFName:%s, dbId:0x%"PRIx64 ", targetId:0x%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId);
ctgInfo("dbId already updated, dbFName:%s, dbId:0x%" PRIx64 ", targetId:0x%" PRIx64, msg->dbFName, dbCache->dbId,
msg->dbId);
goto _return;
}
CTG_ERR_JRET(ctgRemoveDBFromCache(pCtg, dbCache, msg->dbFName));
_return:
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
int32_t code = 0;
int32_t code = 0;
SCtgDropDbVgroupMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCatalog *pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
if (NULL == dbCache) {
goto _return;
}
CTG_ERR_RET(ctgWLockVgInfo(pCtg, dbCache));
CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));
ctgFreeVgInfo(dbCache->vgCache.vgInfo);
dbCache->vgCache.vgInfo = NULL;
......@@ -1701,17 +1646,16 @@ int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
_return:
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
int32_t code = 0;
SCtgUpdateTbMetaMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
STableMetaOutput* pMeta = msg->pMeta;
SCtgDBCache *dbCache = NULL;
SCatalog *pCtg = msg->pCtg;
STableMetaOutput *pMeta = msg->pMeta;
SCtgDBCache *dbCache = NULL;
if ((!CTG_IS_META_CTABLE(pMeta->metaType)) && NULL == pMeta->tbMeta) {
ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", pMeta->dbFName, pMeta->tbName);
......@@ -1721,8 +1665,8 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
if (CTG_IS_META_BOTH(pMeta->metaType) && TSDB_SUPER_TABLE != pMeta->tbMeta->tableType) {
ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, pMeta->tbMeta->tableType);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
}
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pMeta->dbFName, pMeta->dbId, &dbCache));
if (NULL == dbCache) {
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, pMeta->dbFName, pMeta->dbId);
......@@ -1731,17 +1675,19 @@ int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
if (CTG_IS_META_TABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
int32_t metaSize = CTG_META_SIZE(pMeta->tbMeta);
CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize));
CTG_ERR_JRET(
ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize));
pMeta->tbMeta = NULL;
}
if (CTG_IS_META_CTABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
SCTableMeta* ctbMeta = taosMemoryMalloc(sizeof(SCTableMeta));
SCTableMeta *ctbMeta = taosMemoryMalloc(sizeof(SCTableMeta));
if (NULL == ctbMeta) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(ctbMeta, &pMeta->ctbMeta, sizeof(SCTableMeta));
CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->ctbName, (STableMeta *)ctbMeta, sizeof(SCTableMeta)));
CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->ctbName,
(STableMeta *)ctbMeta, sizeof(SCTableMeta)));
}
_return:
......@@ -1750,37 +1696,37 @@ _return:
taosMemoryFreeClear(pMeta->tbMeta);
taosMemoryFreeClear(pMeta);
}
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
int32_t code = 0;
SCtgDropStbMetaMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCatalog *pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
if (NULL == dbCache) {
return TSDB_CODE_SUCCESS;
goto _return;
}
if (msg->dbId && (dbCache->dbId != msg->dbId)) {
ctgDebug("dbId already modified, dbFName:%s, current:0x%"PRIx64", dbId:0x%"PRIx64", stb:%s, suid:0x%"PRIx64,
ctgDebug("dbId already modified, dbFName:%s, current:0x%" PRIx64 ", dbId:0x%" PRIx64 ", stb:%s, suid:0x%" PRIx64,
msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid);
return TSDB_CODE_SUCCESS;
goto _return;
}
if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) {
ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName,
msg->stbName, msg->suid);
} else {
CTG_CACHE_STAT_DEC(numOfStb, 1);
}
SCtgTbCache* pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName));
SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName));
if (NULL == pTbCache) {
ctgDebug("stb %s already not in cache", msg->stbName);
goto _return;
......@@ -1790,29 +1736,29 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
ctgFreeTbCacheImpl(pTbCache);
CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
} else {
CTG_CACHE_STAT_DEC(numOfTbl, 1);
}
ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:0x%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
_return:
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
int32_t code = 0;
SCtgDropTblMetaMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCatalog *pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
......@@ -1821,11 +1767,12 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
}
if (dbCache->dbId != msg->dbId) {
ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%"PRIx64", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId, msg->dbFName, msg->tbName);
ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId,
msg->dbFName, msg->tbName);
goto _return;
}
SCtgTbCache* pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName));
SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName));
if (NULL == pTbCache) {
ctgDebug("tb %s already not in cache", msg->tbName);
goto _return;
......@@ -1834,7 +1781,7 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
ctgFreeTbCacheImpl(pTbCache);
CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) {
ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
......@@ -1852,10 +1799,10 @@ _return:
}
int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
int32_t code = 0;
int32_t code = 0;
SCtgUpdateUserMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCatalog *pCtg = msg->pCtg;
SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user));
if (NULL == pUser) {
SCtgUserAuth userAuth = {0};
......@@ -1899,17 +1846,17 @@ _return:
taosHashCleanup(msg->userAuth.createdDbs);
taosHashCleanup(msg->userAuth.readDbs);
taosHashCleanup(msg->userAuth.writeDbs);
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
int32_t code = 0;
int32_t code = 0;
SCtgUpdateEpsetMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCatalog *pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
if (NULL == dbCache) {
......@@ -1919,23 +1866,23 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));
SDBVgInfo *vgInfo = dbCache->vgCache.vgInfo;
SDBVgInfo *vgInfo = dbCache->vgCache.vgInfo;
if (NULL == vgInfo) {
ctgDebug("vgroup in db %s not cached, ignore epset update", msg->dbFName);
goto _return;
}
SVgroupInfo* pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId));
SVgroupInfo *pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId));
if (NULL == pInfo) {
ctgDebug("no vgroup %d in db %s, ignore epset update", msg->vgId, msg->dbFName);
goto _return;
}
SEp* pOrigEp = &pInfo->epSet.eps[pInfo->epSet.inUse];
SEp* pNewEp = &msg->epSet.eps[msg->epSet.inUse];
ctgDebug("vgroup %d epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d, dbFName:%s in ctg",
pInfo->vgId, pInfo->epSet.inUse, pInfo->epSet.numOfEps, pOrigEp->fqdn, pOrigEp->port,
msg->epSet.inUse, msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName);
SEp *pOrigEp = &pInfo->epSet.eps[pInfo->epSet.inUse];
SEp *pNewEp = &msg->epSet.eps[msg->epSet.inUse];
ctgDebug("vgroup %d epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d, dbFName:%s in ctg", pInfo->vgId,
pInfo->epSet.inUse, pInfo->epSet.numOfEps, pOrigEp->fqdn, pOrigEp->port, msg->epSet.inUse,
msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName);
pInfo->epSet = msg->epSet;
......@@ -1946,17 +1893,17 @@ _return:
}
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation) {
int32_t code = 0;
int32_t code = 0;
SCtgUpdateTbIndexMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
STableIndex* pIndex = msg->pIndex;
SCtgDBCache *dbCache = NULL;
SCatalog *pCtg = msg->pCtg;
STableIndex *pIndex = msg->pIndex;
SCtgDBCache *dbCache = NULL;
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pIndex->dbFName, 0, &dbCache));
CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, pIndex->dbFName, pIndex->tbName, &pIndex));
......@@ -1967,24 +1914,24 @@ _return:
taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo);
taosMemoryFreeClear(pIndex);
}
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation) {
int32_t code = 0;
int32_t code = 0;
SCtgDropTbIndexMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
SCatalog *pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
if (NULL == dbCache) {
return TSDB_CODE_SUCCESS;
}
STableIndex* pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
STableIndex *pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
if (NULL == pIndex) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
......@@ -2000,17 +1947,16 @@ _return:
taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo);
taosMemoryFreeClear(pIndex);
}
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
int32_t code = 0;
int32_t code = 0;
SCtgClearCacheMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
SCatalog *pCtg = msg->pCtg;
CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);
......@@ -2020,7 +1966,7 @@ int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
} else {
ctgClearHandle(pCtg);
}
goto _return;
}
......@@ -2033,17 +1979,17 @@ int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
_return:
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
taosMemoryFreeClear(msg);
CTG_RET(code);
}
void ctgCleanupCacheQueue(void) {
SCtgQNode *node = NULL;
SCtgQNode *nodeNext = NULL;
SCtgQNode *node = NULL;
SCtgQNode *nodeNext = NULL;
SCtgCacheOperation *op = NULL;
bool stopQueue = false;
bool stopQueue = false;
while (true) {
node = gCtgMgmt.queue.head->next;
......@@ -2055,12 +2001,12 @@ void ctgCleanupCacheQueue(void) {
ctgDebug("process [%s] operation", gCtgCacheOperation[op->opId].name);
(*gCtgCacheOperation[op->opId].func)(op);
stopQueue = true;
CTG_RT_STAT_INC(numOfOpDequeue, 1);
CTG_RT_STAT_INC(numOfOpDequeue, 1);
} else {
taosMemoryFree(op->data);
CTG_RT_STAT_INC(numOfOpAbort, 1);
CTG_RT_STAT_INC(numOfOpAbort, 1);
}
if (op->syncOp) {
tsem_post(&op->rspSem);
} else {
......@@ -2070,7 +2016,7 @@ void ctgCleanupCacheQueue(void) {
nodeNext = node->next;
taosMemoryFree(node);
node = nodeNext;
}
......@@ -2085,7 +2031,7 @@ void ctgCleanupCacheQueue(void) {
gCtgMgmt.queue.tail = NULL;
}
void* ctgUpdateThreadFunc(void* param) {
void *ctgUpdateThreadFunc(void *param) {
setThreadName("catalog");
qInfo("catalog update thread started");
......@@ -2094,8 +2040,8 @@ void* ctgUpdateThreadFunc(void* param) {
if (tsem_wait(&gCtgMgmt.queue.reqSem)) {
qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
}
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
if (atomic_load_8((int8_t *)&gCtgMgmt.exit)) {
ctgCleanupCacheQueue();
break;
}
......@@ -2105,7 +2051,7 @@ void* ctgUpdateThreadFunc(void* param) {
SCatalog *pCtg = ((SCtgUpdateMsgHeader *)operation->data)->pCtg;
ctgDebug("process [%s] operation", gCtgCacheOperation[operation->opId].name);
(*gCtgCacheOperation[operation->opId].func)(operation);
if (operation->syncOp) {
......@@ -2114,18 +2060,17 @@ void* ctgUpdateThreadFunc(void* param) {
taosMemoryFreeClear(operation);
}
CTG_RT_STAT_INC(numOfOpDequeue, 1);
CTG_RT_STAT_INC(numOfOpDequeue, 1);
ctgdShowCacheInfo();
ctgdShowClusterCache(pCtg);
}
qInfo("catalog update thread stopped");
return NULL;
}
int32_t ctgStartUpdateThread() {
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
......@@ -2135,13 +2080,12 @@ int32_t ctgStartUpdateThread() {
terrno = TAOS_SYSTEM_ERROR(errno);
CTG_ERR_RET(terrno);
}
taosThreadAttrDestroy(&thAttr);
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) {
int32_t ctgGetTbMetaFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
if (IS_SYS_DBNAME(ctx->pName->dbname)) {
CTG_FLAG_SET_SYS_DB(ctx->flag);
}
......@@ -2221,14 +2165,15 @@ int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
}
#endif
int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, int32_t dbIdx, int32_t *fetchIdx, int32_t baseResIdx, SArray* pList) {
int32_t tbNum = taosArrayGetSize(pList);
SName* pName = taosArrayGet(pList, 0);
char dbFName[TSDB_DB_FNAME_LEN] = {0};
int32_t flag = CTG_FLAG_UNKNOWN_STB;
uint64_t lastSuid = 0;
STableMeta* lastTableMeta = NULL;
int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx *ctx, int32_t dbIdx,
int32_t *fetchIdx, int32_t baseResIdx, SArray *pList) {
int32_t tbNum = taosArrayGetSize(pList);
SName *pName = taosArrayGet(pList, 0);
char dbFName[TSDB_DB_FNAME_LEN] = {0};
int32_t flag = CTG_FLAG_UNKNOWN_STB;
uint64_t lastSuid = 0;
STableMeta *lastTableMeta = NULL;
if (IS_SYS_DBNAME(pName->dbname)) {
CTG_FLAG_SET_SYS_DB(flag);
strcpy(dbFName, pName->dbname);
......@@ -2237,9 +2182,9 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
}
SCtgDBCache *dbCache = NULL;
SCtgTbCache* pCache = NULL;
SCtgTbCache *pCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName);
for (int32_t i = 0; i < tbNum; ++i) {
......@@ -2251,14 +2196,14 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
}
for (int32_t i = 0; i < tbNum; ++i) {
SName* pName = taosArrayGet(pList, i);
SName *pName = taosArrayGet(pList, i);
pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
if (NULL == pCache) {
ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
continue;
}
......@@ -2267,11 +2212,11 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
continue;
}
STableMeta* tbMeta = pCache->pMeta;
STableMeta *tbMeta = pCache->pMeta;
SCtgTbMetaCtx nctx = {0};
nctx.flag = flag;
......@@ -2280,8 +2225,8 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
nctx.tbInfo.suid = tbMeta->suid;
nctx.tbInfo.tbType = tbMeta->tableType;
SMetaRes res = {0};
STableMeta* pTableMeta = NULL;
SMetaRes res = {0};
STableMeta *pTableMeta = NULL;
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
int32_t metaSize = CTG_META_SIZE(tbMeta);
pTableMeta = taosMemoryCalloc(1, metaSize);
......@@ -2289,20 +2234,20 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(pTableMeta, tbMeta, metaSize);
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
taosHashRelease(dbCache->tbCache, pCache);
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
res.pRes = pTableMeta;
taosArrayPush(ctx->pResList, &res);
continue;
}
// PROCESS FOR CHILD TABLE
if (lastSuid && tbMeta->suid == lastSuid && lastTableMeta) {
......@@ -2310,32 +2255,32 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
memcpy(pTableMeta, tbMeta, sizeof(SCTableMeta));
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
taosHashRelease(dbCache->tbCache, pCache);
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
res.pRes = pTableMeta;
taosArrayPush(ctx->pResList, &res);
continue;
}
int32_t metaSize = sizeof(SCTableMeta);
pTableMeta = taosMemoryCalloc(1, metaSize);
if (NULL == pTableMeta) {
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(pTableMeta, tbMeta, metaSize);
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s",
pName->tname, nctx.tbInfo.tbType, dbFName);
char* stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid));
taosHashRelease(dbCache->tbCache, pCache);
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", pName->tname,
nctx.tbInfo.tbType, dbFName);
char *stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid));
if (NULL == stName) {
ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
......@@ -2349,11 +2294,11 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
if (NULL == pCache) {
ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", pTableMeta->suid, stName, dbFName);
taosHashRelease(dbCache->stbCache, stName);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
taosMemoryFreeClear(pTableMeta);
taosMemoryFreeClear(pTableMeta);
continue;
}
......@@ -2363,8 +2308,8 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
if (NULL == pCache->pMeta) {
ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", pTableMeta->suid, dbFName);
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
taosHashRelease(dbCache->tbCache, pCache);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
......@@ -2372,14 +2317,15 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
continue;
}
STableMeta* stbMeta = pCache->pMeta;
if (stbMeta->suid != nctx.tbInfo.suid) {
STableMeta *stbMeta = pCache->pMeta;
if (stbMeta->suid != nctx.tbInfo.suid) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%"PRIx64 , stbMeta->suid, nctx.tbInfo.suid);
taosHashRelease(dbCache->tbCache, pCache);
ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid,
nctx.tbInfo.suid);
ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
......@@ -2387,19 +2333,19 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
continue;
}
metaSize = CTG_META_SIZE(stbMeta);
pTableMeta = taosMemoryRealloc(pTableMeta, metaSize);
if (NULL == pTableMeta) {
if (NULL == pTableMeta) {
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(&pTableMeta->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
taosHashRelease(dbCache->tbCache, pCache);
res.pRes = pTableMeta;
taosArrayPush(ctx->pResList, &res);
......@@ -2408,14 +2354,13 @@ int32_t ctgGetTbMetasFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMe
}
ctgReleaseDBCache(pCtg, dbCache);
return TSDB_CODE_SUCCESS;
}
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) {
int32_t ctgRemoveTbMetaFromCache(SCatalog *pCtg, SName *pTableName, bool syncReq) {
int32_t code = 0;
STableMeta* tblMeta = NULL;
STableMeta *tblMeta = NULL;
SCtgTbMetaCtx tbCtx = {0};
tbCtx.flag = CTG_FLAG_UNKNOWN_STB;
tbCtx.pName = pTableName;
......@@ -2449,7 +2394,7 @@ int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVg
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
SCtgDBCache* dbCache = NULL;
SCtgDBCache *dbCache = NULL;
int32_t code = 0;
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, dbFName);
......@@ -2476,5 +2421,3 @@ _return:
CTG_RET(code);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册