提交 89a6180a 编写于 作者: D dapan1121

enh: add retrieve cached table vgroup and meta api

上级 1138d8ab
...@@ -798,6 +798,9 @@ SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch); ...@@ -798,6 +798,9 @@ SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch);
int32_t ctgdGetOneHandle(SCatalog **pHandle); int32_t ctgdGetOneHandle(SCatalog **pHandle);
int ctgVgInfoComp(const void* lp, const void* rp); int ctgVgInfoComp(const void* lp, const void* rp);
int32_t ctgMakeVgArray(SDBVgInfo* dbInfo); int32_t ctgMakeVgArray(SDBVgInfo* dbInfo);
int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb);
int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache *dbCache, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName);
void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache);
extern SCatalogMgmt gCtgMgmt; extern SCatalogMgmt gCtgMgmt;
extern SCtgDebug gCTGDebug; extern SCtgDebug gCTGDebug;
......
...@@ -551,6 +551,35 @@ _return: ...@@ -551,6 +551,35 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgGetCachedTbVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) {
int32_t code = 0;
char db[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, db);
SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL;
CTG_ERR_RET(ctgAcquireVgMetaFromCache(pCtg, db, pTableName->tname, &dbCache, &tbCache));
if (NULL == dbCache || NULL == tbCache) {
*pTableMeta = NULL;
return TSDB_CODE_SUCCESS;
}
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, pVgroup));
SCtgTbMetaCtx ctx = {0};
ctx.pName = (SName*)pTableName;
ctx.flag = CTG_FLAG_UNKNOWN_STB;
CTG_ERR_JRET(ctgCopyTbMeta(pCtg, &ctx, dbCache, &tbCache, pTableMeta, db));
_return:
ctgReleaseVgMetaToCache(pCtg, dbCache, tbCache);
CTG_RET(code);
}
int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName) { int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName) {
int32_t code = 0; int32_t code = 0;
...@@ -1121,7 +1150,7 @@ int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, ...@@ -1121,7 +1150,7 @@ int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName,
int32_t catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) { int32_t catalogGetCachedTableVgMeta(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, STableMeta** pTableMeta) {
CTG_API_ENTER(); CTG_API_ENTER();
CTG_API_LEAVE(ctgGetTbHashVgroup(pCtg, NULL, pTableName, pVgroup, exists)); CTG_API_LEAVE(ctgGetCachedTbVgMeta(pCtg, pTableName, pVgroup, pTableMeta));
} }
......
...@@ -151,6 +151,16 @@ void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache ...@@ -151,6 +151,16 @@ void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache
} }
} }
void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
}
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) { int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache); ctgAcquireDBCache(pCtg, dbFName, &dbCache);
...@@ -226,6 +236,75 @@ _return: ...@@ -226,6 +236,75 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName);
CTG_CACHE_STAT_INC(numOfVgMiss, 1);
goto _return;
}
bool vgInCache = false;
ctgRLockVgInfo(pCtg, dbCache, &vgInCache);
if (!vgInCache) {
ctgDebug("vgInfo of db %s not in cache", dbFName);
CTG_CACHE_STAT_INC(numOfVgMiss, 1);
goto _return;
}
*pDb = dbCache;
CTG_CACHE_STAT_INC(numOfVgHit, 1);
ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
tbCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
if (NULL == tbCache) {
ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
goto _return;
}
CTG_LOCK(CTG_READ, &tbCache->metaLock);
if (NULL == tbCache->pMeta) {
ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
goto _return;
}
*pTb = tbCache;
ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);
CTG_CACHE_STAT_INC(numOfMetaHit, 1);
return TSDB_CODE_SUCCESS;
_return:
if (tbCache) {
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
taosHashRelease(dbCache->tbCache, tbCache);
}
if (vgInCache) {
ctgRUnlockVgInfo(dbCache);
}
if (dbCache) {
ctgReleaseDBCache(pCtg, dbCache);
}
*pDb = NULL;
*pTb = NULL;
return TSDB_CODE_SUCCESS;
}
/* /*
int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) { int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) {
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
...@@ -378,25 +457,8 @@ int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32 ...@@ -378,25 +457,8 @@ int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) { int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache *dbCache, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName) {
int32_t code = 0; SCtgTbCache *tbCache = *pTb;
SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL;
*pTableMeta = NULL;
char dbFName[TSDB_DB_FNAME_LEN] = {0};
if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
strcpy(dbFName, ctx->pName->dbname);
} else {
tNameGetFullDbName(ctx->pName, dbFName);
}
ctgAcquireTbMetaFromCache(pCtg, dbFName, ctx->pName->tname, &dbCache, &tbCache);
if (NULL == tbCache) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
return TSDB_CODE_SUCCESS;
}
STableMeta *tbMeta = tbCache->pMeta; STableMeta *tbMeta = tbCache->pMeta;
ctx->tbInfo.inCache = true; ctx->tbInfo.inCache = true;
ctx->tbInfo.dbId = dbCache->dbId; ctx->tbInfo.dbId = dbCache->dbId;
...@@ -407,13 +469,11 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta ** ...@@ -407,13 +469,11 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **
int32_t metaSize = CTG_META_SIZE(tbMeta); int32_t metaSize = CTG_META_SIZE(tbMeta);
*pTableMeta = taosMemoryCalloc(1, metaSize); *pTableMeta = taosMemoryCalloc(1, metaSize);
if (NULL == *pTableMeta) { if (NULL == *pTableMeta) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(*pTableMeta, tbMeta, metaSize); memcpy(*pTableMeta, tbMeta, metaSize);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName); ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -423,7 +483,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta ** ...@@ -423,7 +483,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **
int32_t metaSize = sizeof(SCTableMeta); int32_t metaSize = sizeof(SCTableMeta);
*pTableMeta = taosMemoryCalloc(1, metaSize); *pTableMeta = taosMemoryCalloc(1, metaSize);
if (NULL == *pTableMeta) { if (NULL == *pTableMeta) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(*pTableMeta, tbMeta, metaSize); memcpy(*pTableMeta, tbMeta, metaSize);
...@@ -433,6 +493,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta ** ...@@ -433,6 +493,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **
if (tbCache) { if (tbCache) {
CTG_UNLOCK(CTG_READ, &tbCache->metaLock); CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
taosHashRelease(dbCache->tbCache, tbCache); taosHashRelease(dbCache->tbCache, tbCache);
*pTb = NULL;
} }
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname, ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
...@@ -440,28 +501,53 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta ** ...@@ -440,28 +501,53 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **
ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache); ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache);
if (NULL == tbCache) { if (NULL == tbCache) {
//ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
taosMemoryFreeClear(*pTableMeta); taosMemoryFreeClear(*pTableMeta);
ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid); ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
*pTb = tbCache;
STableMeta *stbMeta = tbCache->pMeta; STableMeta *stbMeta = tbCache->pMeta;
if (stbMeta->suid != ctx->tbInfo.suid) { if (stbMeta->suid != ctx->tbInfo.suid) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid); ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); taosMemoryFreeClear(*pTableMeta);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
metaSize = CTG_META_SIZE(stbMeta); metaSize = CTG_META_SIZE(stbMeta);
*pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize); *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize);
if (NULL == *pTableMeta) { if (NULL == *pTableMeta) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta)); memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
return TSDB_CODE_SUCCESS;
}
int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
int32_t code = 0;
SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL;
*pTableMeta = NULL;
char dbFName[TSDB_DB_FNAME_LEN] = {0};
if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
strcpy(dbFName, ctx->pName->dbname);
} else {
tNameGetFullDbName(ctx->pName, dbFName);
}
ctgAcquireTbMetaFromCache(pCtg, dbFName, ctx->pName->tname, &dbCache, &tbCache);
if (NULL == tbCache) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
return TSDB_CODE_SUCCESS;
}
CTG_ERR_JRET(ctgCopyTbMeta(pCtg, ctx, dbCache, &tbCache, pTableMeta, dbFName));
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache); ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("Got tb %s meta from cache, dbFName:%s", ctx->pName->tname, dbFName); ctgDebug("Got tb %s meta from cache, dbFName:%s", ctx->pName->tname, dbFName);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册