提交 31f3bed3 编写于 作者: D dapan1121

support index cache

上级 041002a5
......@@ -2513,6 +2513,7 @@ typedef struct {
} STableIndexInfo;
typedef struct {
int32_t version;
SArray* pIndex;
} STableIndexRsp;
......
......@@ -2437,6 +2437,7 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->version) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->pIndex);
if (tEncodeI32(&encoder, num) < 0) return -1;
if (num > 0) {
......@@ -2471,6 +2472,7 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->version) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num > 0) {
......
......@@ -57,6 +57,8 @@ enum {
CTG_OP_DROP_TB_META,
CTG_OP_UPDATE_USER,
CTG_OP_UPDATE_VG_EPSET,
CTG_OP_UPDATE_TB_INDEX,
CTG_OP_DROP_TB_INDEX,
CTG_OP_MAX
};
......@@ -128,19 +130,27 @@ typedef struct SCtgUserCtx {
SUserAuthInfo user;
} SCtgUserCtx;
typedef struct SCtgTbMetaCache {
SRWLatch stbLock;
SRWLatch metaLock; // RC between cache destroy and all other operations
SHashObj *metaCache; //key:tbname, value:STableMeta
SHashObj *stbCache; //key:suid, value:STableMeta*
} SCtgTbMetaCache;
typedef STableIndexRsp STableIndex;
typedef struct SCtgDBCache {
typedef struct SCtgTbCache {
SRWLatch metaLock;
STableMeta *pMeta;
SRWLatch indexLock;
STableIndex *pIndex;
} SCtgTbCache;
typedef struct SCtgVgCache {
SRWLatch vgLock;
SDBVgInfo *vgInfo;
} SCtgVgCache;
typedef struct SCtgDBCache {
SRWLatch dbLock; // RC between destroy tbCache/stbCache and all reads
uint64_t dbId;
int8_t deleted;
SDBVgInfo *vgInfo;
SCtgTbMetaCache tbCache;
SCtgVgCache vgCache;
SHashObj *tbCache; // key:tbname, value:SCtgTbCache
SHashObj *stbCache; // key:suid, value:STableMeta*
} SCtgDBCache;
typedef struct SCtgRentSlot {
......@@ -245,8 +255,10 @@ typedef struct SCtgCacheStat {
uint64_t userNum;
uint64_t vgHitNum;
uint64_t vgMissNum;
uint64_t tblHitNum;
uint64_t tblMissNum;
uint64_t tbMetaHitNum;
uint64_t tbMetaMissNum;
uint64_t tbIndexHitNum;
uint64_t tbIndexMissNum;
uint64_t userHitNum;
uint64_t userMissNum;
} SCtgCacheStat;
......@@ -268,10 +280,10 @@ typedef struct SCtgUpdateVgMsg {
SDBVgInfo* dbInfo;
} SCtgUpdateVgMsg;
typedef struct SCtgUpdateTblMsg {
SCatalog* pCtg;
STableMetaOutput* output;
} SCtgUpdateTblMsg;
typedef struct SCtgUpdateTbMetaMsg {
SCatalog* pCtg;
STableMetaOutput* pMeta;
} SCtgUpdateTbMetaMsg;
typedef struct SCtgDropDBMsg {
SCatalog* pCtg;
......@@ -305,6 +317,19 @@ typedef struct SCtgUpdateUserMsg {
SGetUserAuthRsp userAuth;
} SCtgUpdateUserMsg;
typedef struct SCtgUpdateTbIndexMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
STableIndex* pIndex;
} SCtgUpdateTbIndexMsg;
typedef struct SCtgDropTbIndexMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
} SCtgDropTbIndexMsg;
typedef struct SCtgUpdateEpsetMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
......@@ -465,8 +490,7 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *action);
int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation);
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache);
void ctgReleaseVgInfo(SCtgDBCache *dbCache);
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
void ctgRUnlockVgInfo(SCtgDBCache *dbCache);
int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist);
int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName);
......@@ -479,6 +503,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId
int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq);
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq);
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet);
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, SName* pName, STableIndex *pIndex, bool syncOp);
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type);
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size);
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size);
......@@ -521,6 +546,7 @@ void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
char *ctgTaskTypeStr(CTG_TASK_TYPE type);
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask);
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes);
extern SCatalogMgmt gCtgMgmt;
......
......@@ -97,8 +97,7 @@ int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char*
if (NULL != dbCache) {
input.dbId = dbCache->dbId;
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseVgInfoToCache(pCtg, dbCache);
}
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
......@@ -383,6 +382,18 @@ _return:
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTbIndex(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SArray** pRes) {
CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pTableName, pRes));
if (*pRes) {
return TSDB_CODE_SUCCESS;
}
CTG_ERR_RET(ctgGetTbIndexFromMnode(pCtg, pConn, (SName*)pTableName, pRes, NULL));
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, SArray** pVgList) {
STableMeta *tbMeta = NULL;
int32_t code = 0;
......@@ -443,7 +454,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTabl
_return:
if (dbCache) {
ctgReleaseVgInfo(dbCache);
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
......@@ -633,8 +644,7 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
*dbId = dbCache->dbId;
*tableNum = dbCache->vgInfo->numOfTable;
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseVgInfoToCache(pCtg, dbCache);
ctgDebug("Got db vgVersion from cache, dbFName:%s, vgVersion:%d", dbFName, *version);
......@@ -659,7 +669,7 @@ int32_t catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char*
SDBVgInfo *vgInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, dbFName, &dbCache, &vgInfo));
if (dbCache) {
vgHash = dbCache->vgInfo->vgHash;
vgHash = dbCache->vgCache.vgInfo->vgHash;
} else {
vgHash = vgInfo->vgHash;
}
......@@ -672,7 +682,7 @@ int32_t catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char*
_return:
if (dbCache) {
ctgReleaseVgInfo(dbCache);
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
......@@ -940,7 +950,7 @@ int32_t catalogGetTableHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const
_return:
if (dbCache) {
ctgReleaseVgInfo(dbCache);
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
......@@ -1143,7 +1153,17 @@ int32_t catalogGetTableIndex(SCatalog* pCtg, SRequestConnInfo *pConn, const SNam
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
CTG_API_LEAVE(ctgGetTbIndexFromMnode(pCtg, pConn, (SName*)pTableName, pRes, NULL));
int32_t code = 0;
CTG_ERR_JRET(ctgGetTbIndexFromMnode(pCtg, pConn, (SName*)pTableName, pRes, NULL));
SArray* pInfo = NULL;
CTG_ERR_JRET(ctgCloneTableIndex(*pRes, &pInfo));
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pCtg, pTableName, pInfo, false));
_return:
CTG_API_LEAVE(code);
}
......
......@@ -344,6 +344,11 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, SCtgJob *pJob, const SCatalogReq* p
}
}
for (int32_t i = 0; i < pJob->tbIndexNum; ++i) {
SName* name = taosArrayGet(pReq->pTableIndex, i);
ctgDropTbIndexEnqueue(pCtg, name, true);
}
return TSDB_CODE_SUCCESS;
}
......@@ -524,7 +529,7 @@ int32_t ctgDumpTbIndexRes(SCtgTask* pTask) {
}
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
taosArrayPush(pJob->jobRes.pTableHash, &res);
taosArrayPush(pJob->jobRes.pTableIndex, &res);
return TSDB_CODE_SUCCESS;
}
......@@ -687,8 +692,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
ctx->vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask));
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseVgInfoToCache(pCtg, dbCache);
} else {
SBuildUseDBInput input = {0};
......@@ -786,7 +790,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
_return:
if (dbCache) {
ctgReleaseVgInfo(dbCache);
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
......@@ -866,9 +870,16 @@ _return:
int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out);
STableIndex* pOut = (STableIndex*)pTask->msgCtx.out;
SArray* pInfo = NULL;
CTG_ERR_JRET(ctgCloneTableIndex(pOut->pIndex, &pInfo));
pTask->res = pInfo;
pTask->msgCtx.out = NULL;
SCtgTbIndexCtx* ctx = pTask->taskCtx;
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pTask->pJob->pCtg, ctx->pName, pOut, false));
_return:
......@@ -1024,8 +1035,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
_return:
if (dbCache) {
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseVgInfoToCache(pCtg, dbCache);
}
CTG_RET(code);
......@@ -1070,8 +1080,7 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
_return:
if (dbCache) {
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseVgInfoToCache(pCtg, dbCache);
}
CTG_RET(code);
......@@ -1105,8 +1114,7 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
_return:
if (dbCache) {
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseVgInfoToCache(pCtg, dbCache);
}
CTG_RET(code);
......@@ -1117,6 +1125,15 @@ int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
SArray* pRes = NULL;
CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pCtx->pName, &pRes));
if (pRes) {
pTask->res = pRes;
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
return TSDB_CODE_SUCCESS;
}
CTG_ERR_RET(ctgGetTbIndexFromMnode(pCtg, pConn, pCtx->pName, NULL, pTask));
return TSDB_CODE_SUCCESS;
......@@ -1167,8 +1184,7 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) {
_return:
if (dbCache) {
ctgReleaseVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseVgInfoToCache(pCtg, dbCache);
}
CTG_RET(code);
......
......@@ -59,6 +59,11 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {
CTG_OP_UPDATE_VG_EPSET,
"update epset",
ctgOpUpdateEpset
},
{
CTG_OP_UPDATE_TB_INDEX,
"update tbIndex",
ctgOpUpdateTbIndex
}
};
......@@ -66,11 +71,11 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {
int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
CTG_LOCK(CTG_READ, &dbCache->vgLock);
int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
CTG_LOCK(CTG_READ, &dbCache->vgCache.vgLock);
if (dbCache->deleted) {
CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
ctgDebug("db is dropping, dbId:%"PRIx64, dbCache->dbId);
......@@ -79,8 +84,8 @@ int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
}
if (NULL == dbCache->vgInfo) {
CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
if (NULL == dbCache->vgCache.vgInfo) {
CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
*inCache = false;
ctgDebug("db vgInfo is empty, dbId:%"PRIx64, dbCache->dbId);
......@@ -92,50 +97,47 @@ int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgWAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
CTG_LOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
if (dbCache->deleted) {
ctgDebug("db is dropping, dbId:%"PRIx64, dbCache->dbId);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
}
return TSDB_CODE_SUCCESS;
}
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
taosHashRelease(pCtg->dbCache, dbCache);
void ctgRUnlockVgInfo(SCtgDBCache *dbCache) {
CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
}
void ctgReleaseVgInfo(SCtgDBCache *dbCache) {
CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
void ctgWUnlockVgInfo(SCtgDBCache *dbCache) {
CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
}
void ctgWReleaseVgInfo(SCtgDBCache *dbCache) {
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
CTG_UNLOCK(CTG_READ, &dbCache->dbLock);
}
int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
char *p = strchr(dbFName, '.');
if (p && CTG_IS_SYS_DBNAME(p + 1)) {
dbFName = p + 1;
}
SCtgDBCache *dbCache = NULL;
if (acquire) {
dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName));
} else {
dbCache = (SCtgDBCache *)taosHashGet(pCtg->dbCache, dbFName, strlen(dbFName));
}
SCtgDBCache *dbCache = (SCtgDBCache *)taosHashGet(pCtg->dbCache, dbFName, strlen(dbFName));
if (NULL == dbCache) {
*pCache = NULL;
ctgDebug("db not in cache, dbFName:%s", dbFName);
return TSDB_CODE_SUCCESS;
}
if (acquire) {
CTG_LOCK(CTG_READ, &dbCache->dbLock);
}
if (dbCache->deleted) {
if (acquire) {
ctgReleaseDBCache(pCtg, dbCache);
......@@ -159,15 +161,35 @@ int32_t ctgGetDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache)
CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
}
void ctgReleaseVgInfoToCache(SCatalog* pCtg, SCtgDBCache *dbCache) {
ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache);
}
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
SCtgDBCache *dbCache = NULL;
void ctgReleaseTbMetaToCache(SCatalog* pCtg, SCtgDBCache *dbCache, SCtgTbCache* pCache) {
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
taosHashRelease(dbCache->tbCache, pCache);
}
if (NULL == pCtg->dbCache) {
ctgDebug("empty db cache, dbFName:%s", dbFName);
goto _return;
if (dbCache) {
ctgReleaseDBCache(pCtg, dbCache);
}
}
void ctgReleaseTbIndexToCache(SCatalog* pCtg, SCtgDBCache *dbCache, SCtgTbCache* pCache) {
if (pCache) {
CTG_UNLOCK(CTG_READ, &pCache->indexLock);
taosHashRelease(dbCache->tbCache, pCache);
}
if (dbCache) {
ctgReleaseDBCache(pCtg, dbCache);
}
}
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
SCtgDBCache *dbCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName);
......@@ -175,7 +197,7 @@ int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCac
}
bool inCache = false;
ctgAcquireVgInfo(pCtg, dbCache, &inCache);
ctgRLockVgInfo(pCtg, dbCache, &inCache);
if (!inCache) {
ctgDebug("vgInfo of db %s not in cache", dbFName);
goto _return;
......@@ -202,54 +224,107 @@ _return:
return TSDB_CODE_SUCCESS;
}
int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
if (NULL == pCtg->dbCache) {
*exist = 0;
ctgWarn("empty db cache, dbFName:%s, tbName:%s", dbFName, tbName);
return TSDB_CODE_SUCCESS;
int32_t ctgAcquireTbMetaFromCache(SCatalog* pCtg, char *dbFName, char* tbName, SCtgDBCache **pDb, SCtgTbCache** pTb) {
SCtgDBCache *dbCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %s not in cache", dbFName);
goto _return;
}
int32_t sz = 0;
SCtgTbCache* pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
if (NULL == pCache) {
ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
goto _return;
}
CTG_LOCK(CTG_READ, &pCache->metaLock);
if (NULL == pCache->pMeta) {
ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
goto _return;
}
*pDb = dbCache;
*pTb = pCache;
ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);
CTG_CACHE_STAT_INC(tbMetaHitNum, 1);
return TSDB_CODE_SUCCESS;
_return:
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
CTG_CACHE_STAT_INC(tbMetaMissNum, 1);
return TSDB_CODE_SUCCESS;
}
int32_t ctgAcquireTbIndexFromCache(SCatalog* pCtg, char *dbFName, char* tbName, SCtgDBCache **pDb, SCtgTbCache** pTb) {
SCtgDBCache *dbCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
*exist = 0;
return TSDB_CODE_SUCCESS;
ctgDebug("db %s not in cache", dbFName);
goto _return;
}
int32_t sz = 0;
SCtgTbCache* pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
if (NULL == pCache) {
ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
goto _return;
}
CTG_LOCK(CTG_READ, &pCache->indexLock);
if (NULL == pCache->pIndex) {
ctgDebug("tb %s index not in cache, dbFName:%s", tbName, dbFName);
goto _return;
}
size_t sz = 0;
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
STableMeta *tbMeta = taosHashGet(dbCache->tbCache.metaCache, tbName, strlen(tbName));
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
*pDb = dbCache;
*pTb = pCache;
ctgDebug("tb %s index got in cache, dbFName:%s", tbName, dbFName);
if (NULL == tbMeta) {
ctgReleaseDBCache(pCtg, dbCache);
CTG_CACHE_STAT_INC(tbIndexHitNum, 1);
return TSDB_CODE_SUCCESS;
_return:
ctgReleaseTbIndexToCache(pCtg, dbCache, pCache);
CTG_CACHE_STAT_INC(tbIndexMissNum, 1);
return TSDB_CODE_SUCCESS;
}
int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
SCtgDBCache *dbCache = NULL;
ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache);
if (NULL == tbCache) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
*exist = 0;
ctgDebug("tbmeta not in cache, dbFName:%s, tbName:%s", dbFName, tbName);
return TSDB_CODE_SUCCESS;
}
*exist = 1;
ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("tbmeta is in cache, dbFName:%s, tbName:%s", dbFName, tbName);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
return TSDB_CODE_SUCCESS;
}
int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) {
int32_t code = 0;
SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL;
*pTableMeta = NULL;
if (NULL == pCtg->dbCache) {
ctgDebug("empty tbmeta cache, tbName:%s", ctx->pName->tname);
return TSDB_CODE_SUCCESS;
}
char dbFName[TSDB_DB_FNAME_LEN] = {0};
if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
strcpy(dbFName, ctx->pName->dbname);
......@@ -257,78 +332,53 @@ int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta**
tNameGetFullDbName(ctx->pName, dbFName);
}
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %d.%s not in cache", ctx->pName->acctId, ctx->pName->dbname);
return TSDB_CODE_SUCCESS;
}
int32_t sz = 0;
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
taosHashGetDup_m(dbCache->tbCache.metaCache, ctx->pName->tname, strlen(ctx->pName->tname), (void **)pTableMeta, &sz);
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
if (NULL == *pTableMeta) {
ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("tbl not in cache, dbFName:%s, tbName:%s", dbFName, ctx->pName->tname);
ctgAcquireTbMetaFromCache(pCtg, dbFName, ctx->pName->tname, &dbCache, &tbCache);
if (NULL == tbCache) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
return TSDB_CODE_SUCCESS;
}
STableMeta* tbMeta = *pTableMeta;
STableMeta* tbMeta = tbCache->pMeta;
ctx->tbInfo.inCache = true;
ctx->tbInfo.dbId = dbCache->dbId;
ctx->tbInfo.suid = tbMeta->suid;
ctx->tbInfo.tbType = tbMeta->tableType;
if (tbMeta->tableType != TSDB_CHILD_TABLE) {
ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got meta from cache, type:%d, dbFName:%s, tbName:%s", tbMeta->tableType, dbFName, ctx->pName->tname);
CTG_CACHE_STAT_INC(tblHitNum, 1);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName);
return TSDB_CODE_SUCCESS;
}
CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock);
STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &tbMeta->suid, sizeof(tbMeta->suid));
STableMeta **stbMeta = taosHashGet(dbCache->stbCache, &tbMeta->suid, sizeof(tbMeta->suid));
if (NULL == stbMeta || NULL == *stbMeta) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgError("stb not in stbCache, suid:%"PRIx64, tbMeta->suid);
goto _return;
}
if ((*stbMeta)->suid != tbMeta->suid) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, tbMeta->suid, (*stbMeta)->suid);
ctgError("stb suid %" PRIx64 " in stbCache mis-match, expected suid:%"PRIx64 , (*stbMeta)->suid, tbMeta->suid);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
int32_t metaSize = CTG_META_SIZE(*stbMeta);
*pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize);
if (NULL == *pTableMeta) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgError("realloc size[%d] failed", metaSize);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgReleaseDBCache(pCtg, dbCache);
CTG_CACHE_STAT_INC(tblHitNum, 1);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("Got tbmeta from cache, dbFName:%s, tbName:%s", dbFName, ctx->pName->tname);
ctgDebug("Got tb %s meta from cache, dbFName:%s", ctx->pName->tname, dbFName);
return TSDB_CODE_SUCCESS;
_return:
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
taosMemoryFreeClear(*pTableMeta);
CTG_CACHE_STAT_INC(tblMissNum, 1);
CTG_RET(code);
}
......@@ -338,62 +388,42 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *
*sver = -1;
*tver = -1;
if (NULL == pCtg->dbCache) {
ctgDebug("empty tbmeta cache, tbName:%s", pTableName->tname);
return TSDB_CODE_SUCCESS;
}
SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL;
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, dbFName);
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
ctgDebug("db %s not in cache", pTableName->tname);
ctgAcquireTbMetaFromCache(pCtg, dbFName, pTableName->tname, &dbCache, &tbCache);
if (NULL == tbCache) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
return TSDB_CODE_SUCCESS;
}
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
STableMeta *tbMeta = taosHashGet(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname));
if (tbMeta) {
*tbType = tbMeta->tableType;
*suid = tbMeta->suid;
if (*tbType != TSDB_CHILD_TABLE) {
*sver = tbMeta->sversion;
*tver = tbMeta->tversion;
}
}
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
if (NULL == tbMeta) {
ctgReleaseDBCache(pCtg, dbCache);
return TSDB_CODE_SUCCESS;
}
STableMeta* tbMeta = tbCache->pMeta;
*tbType = tbMeta->tableType;
*suid = tbMeta->suid;
if (*tbType != TSDB_CHILD_TABLE) {
ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got sver %d tver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tver, *tbType, dbFName, pTableName->tname);
*sver = tbMeta->sversion;
*tver = tbMeta->tversion;
ctgDebug("Got tb %s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:%" PRIx64,
pTableName->tname, dbFName, *tbType, *sver, *tver, *suid);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
return TSDB_CODE_SUCCESS;
}
ctgDebug("Got subtable meta from cache, dbFName:%s, tbName:%s, suid:%" PRIx64, dbFName, pTableName->tname, *suid);
CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock);
STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, suid, sizeof(*suid));
STableMeta **stbMeta = taosHashGet(dbCache->stbCache, suid, sizeof(*suid));
if (NULL == stbMeta || NULL == *stbMeta) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("stb not in stbCache, suid:%" PRIx64, *suid);
return TSDB_CODE_SUCCESS;
}
if ((*stbMeta)->suid != *suid) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgReleaseDBCache(pCtg, dbCache);
ctgError("stable suid in stbCache mis-match, expected suid:%" PRIx64 ",actual suid:%" PRIx64, *suid,
(*stbMeta)->suid);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgError("stb suid %" PRIx64 " in stbCache mis-match, expected suid:%" PRIx64 , (*stbMeta)->suid, *suid);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
......@@ -406,50 +436,53 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *
*sver = (*stbMeta)->sversion;
*tver = (*stbMeta)->tversion;
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("Got sver %d tver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tver, *tbType, dbFName, pTableName->tname);
ctgDebug("Got tb %s sver %d tver %d from cache, type:%d, dbFName:%s", pTableName->tname, *sver, *tver, *tbType, dbFName);
return TSDB_CODE_SUCCESS;
}
int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, const char* dbFName, const char *tableName, int32_t *tbType) {
if (NULL == pCtg->dbCache) {
ctgWarn("empty db cache, dbFName:%s, tbName:%s", dbFName, tableName);
SCtgDBCache *dbCache = NULL;
SCtgTbCache *tbCache = NULL;
ctgAcquireTbMetaFromCache(pCtg, dbFName, tableName, &dbCache, &tbCache);
if (NULL == tbCache) {
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
return TSDB_CODE_SUCCESS;
}
*tbType = tbCache->pMeta->tableType;
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
ctgDebug("Got tb %s tbType %d from cache, dbFName:%s", tableName, *tbType, dbFName);
return TSDB_CODE_SUCCESS;
}
int32_t ctgReadTbIndexFromCache(SCatalog* pCtg, const SName* pTableName, SArray** pRes) {
int32_t code = 0;
SCtgDBCache *dbCache = NULL;
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
if (NULL == dbCache) {
return TSDB_CODE_SUCCESS;
}
SCtgTbCache *tbCache = NULL;
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pTableName, dbFName);
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
STableMeta *pTableMeta = (STableMeta *)taosHashAcquire(dbCache->tbCache.metaCache, tableName, strlen(tableName));
*pRes = NULL;
if (NULL == pTableMeta) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
ctgWarn("tbl not in cache, dbFName:%s, tbName:%s", dbFName, tableName);
ctgReleaseDBCache(pCtg, dbCache);
ctgAcquireTbIndexFromCache(pCtg, dbFName, pTableName->tname, &dbCache, &tbCache);
if (NULL == tbCache) {
ctgReleaseTbIndexToCache(pCtg, dbCache, tbCache);
return TSDB_CODE_SUCCESS;
}
*tbType = atomic_load_8(&pTableMeta->tableType);
CTG_ERR_JRET(ctgCloneTableIndex(tbCache->pIndex->pIndex, pRes));
taosHashRelease(dbCache->tbCache.metaCache, pTableMeta);
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
_return:
ctgReleaseDBCache(pCtg, dbCache);
ctgReleaseTbIndexToCache(pCtg, dbCache, tbCache);
ctgDebug("Got tbtype from cache, dbFName:%s, tbName:%s, type:%d", dbFName, tableName, *tbType);
return TSDB_CODE_SUCCESS;
CTG_RET(code);
}
int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass) {
......@@ -718,9 +751,9 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
op->opId = CTG_OP_UPDATE_TB_META;
op->syncOp = syncOp;
SCtgUpdateTblMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTblMsg));
SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
......@@ -730,7 +763,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool sy
}
msg->pCtg = pCtg;
msg->output = output;
msg->pMeta = output;
op->data = msg;
......@@ -805,6 +838,68 @@ _return:
CTG_RET(code);
}
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, SName* pName, STableIndex *pIndex, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_TB_INDEX;
op->syncOp = syncOp;
SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg));
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
msg->pCtg = pCtg;
msg->pIndex = pIndex;
tNameGetFullDbName(pName, msg->dbFName);
strcpy(msg->tbName, pName->tname);
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
_return:
taosArrayDestroyEx(pIndex, tFreeSTableIndexInfo);
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgDropTbIndexEnqueue(SCatalog* pCtg, SName* pName, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_TB_INDEX;
op->syncOp = syncOp;
SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
msg->pCtg = pCtg;
tNameGetFullDbName(pName, msg->dbFName);
strcpy(msg->tbName, pName->tname);
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
_return:
taosArrayDestroyEx(pIndex, tFreeSTableIndexInfo);
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
mgmt->slotRIdx = 0;
mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
......@@ -1006,14 +1101,14 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
SCtgDBCache newDBCache = {0};
newDBCache.dbId = dbId;
newDBCache.tbCache.metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == newDBCache.tbCache.metaCache) {
newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (NULL == newDBCache.tbCache) {
ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
newDBCache.tbCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
if (NULL == newDBCache.tbCache.stbCache) {
newDBCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
if (NULL == newDBCache.stbCache) {
ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
}
......@@ -1050,22 +1145,22 @@ _return:
}
void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) {
CTG_LOCK(CTG_WRITE, &cache->stbLock);
if (cache->stbCache) {
void *pIter = taosHashIterate(cache->stbCache, NULL);
while (pIter) {
uint64_t *suid = NULL;
suid = taosHashGetKey(pIter, NULL);
void ctgRemoveStbRent(SCatalog* pCtg, SCtgDBCache *dbCache) {
if (NULL == dbCache->stbCache) {
return;
}
void *pIter = taosHashIterate(dbCache->stbCache, NULL);
while (pIter) {
uint64_t *suid = NULL;
suid = taosHashGetKey(pIter, NULL);
if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
ctgDebug("stb removed from rent, suid:%"PRIx64, *suid);
}
pIter = taosHashIterate(cache->stbCache, pIter);
if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
ctgDebug("stb removed from rent, suid:%"PRIx64, *suid);
}
pIter = taosHashIterate(dbCache->stbCache, pIter);
}
CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
}
......@@ -1074,15 +1169,16 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d
ctgInfo("start to remove db from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);
atomic_store_8(&dbCache->deleted, 1);
ctgRemoveStbRent(pCtg, &dbCache->tbCache);
CTG_LOCK(CTG_WRITE, &dbCache->dbLock);
atomic_store_8(&dbCache->deleted, 1);
ctgRemoveStbRent(pCtg, &dbCache);
ctgFreeDbCache(dbCache);
CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
ctgDebug("db removed from rent, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);
CTG_UNLOCK(CTG_WRITE, &dbCache->dbLock);
CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
ctgDebug("db removed from rent, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) {
ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName);
......@@ -1090,7 +1186,6 @@ int32_t ctgRemoveDBFromCache(SCatalog* pCtg, SCtgDBCache *dbCache, const char* d
}
CTG_CACHE_STAT_DEC(dbNum, 1);
ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
return TSDB_CODE_SUCCESS;
......@@ -1138,120 +1233,54 @@ int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCt
return TSDB_CODE_SUCCESS;
}
int32_t ctgWriteDBVgInfoToCache(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo** pDbInfo) {
int32_t code = 0;
SDBVgInfo* dbInfo = *pDbInfo;
if (NULL == dbInfo->vgHash) {
return TSDB_CODE_SUCCESS;
}
if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d",
dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
bool newAdded = false;
SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable};
SCtgDBCache *dbCache = NULL;
CTG_ERR_RET(ctgGetAddDBCache(pCtg, dbFName, dbId, &dbCache));
if (NULL == dbCache) {
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
SDBVgInfo *vgInfo = NULL;
CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache));
if (dbCache->vgInfo) {
if (dbInfo->vgVersion < dbCache->vgInfo->vgVersion) {
ctgDebug("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion);
ctgWReleaseVgInfo(dbCache);
return TSDB_CODE_SUCCESS;
}
if (dbInfo->vgVersion == dbCache->vgInfo->vgVersion && dbInfo->numOfTable == dbCache->vgInfo->numOfTable) {
ctgDebug("no new db vgVersion or numOfTable, dbFName:%s, vgVersion:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, dbInfo->numOfTable);
ctgWReleaseVgInfo(dbCache);
return TSDB_CODE_SUCCESS;
}
ctgFreeVgInfo(dbCache->vgInfo);
}
dbCache->vgInfo = dbInfo;
*pDbInfo = NULL;
ctgDebug("db vgInfo updated, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId);
ctgWReleaseVgInfo(dbCache);
dbCache = NULL;
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
CTG_RET(code);
}
int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName, STableMeta *meta, int32_t metaSize) {
SCtgTbMetaCache *tbCache = &dbCache->tbCache;
CTG_LOCK(CTG_READ, &tbCache->metaLock);
if (dbCache->deleted || NULL == tbCache->metaCache || NULL == tbCache->stbCache) {
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
if (NULL == dbCache->tbCache || NULL == dbCache->stbCache) {
taosMemoryFree(meta);
ctgError("db is dropping, dbId:%"PRIx64, dbCache->dbId);
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
}
bool isStb = meta->tableType == TSDB_SUPER_TABLE;
SCtgTbCache* pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
STableMeta *orig = (pCache ? pCache->pMeta : NULL);
int8_t origType = 0;
uint64_t origSuid = 0;
bool isStb = meta->tableType == TSDB_SUPER_TABLE;
STableMeta *orig = taosHashGet(tbCache->metaCache, tbName, strlen(tbName));
if (orig) {
origType = orig->tableType;
if (origType == meta->tableType && orig->uid == meta->uid && orig->sversion >= meta->sversion && orig->tversion >= meta->tversion) {
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
taosMemoryFree(meta);
ctgDebug("ignore table %s meta update", tbName);
return TSDB_CODE_SUCCESS;
}
if (origType == TSDB_SUPER_TABLE) {
CTG_LOCK(CTG_WRITE, &tbCache->stbLock);
if (taosHashRemove(tbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
} else {
CTG_CACHE_STAT_DEC(stblNum, 1);
ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
}
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare);
origSuid = orig->suid;
}
}
if (isStb) {
CTG_LOCK(CTG_WRITE, &tbCache->stbLock);
}
if (taosHashPut(tbCache->metaCache, tbName, strlen(tbName), meta, metaSize) != 0) {
if (isStb) {
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
if (NULL == pCache) {
SCtgTbCache cache = {0};
cache.pMeta = meta;
if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(SCtgTbCache)) != 0) {
taosMemoryFree(meta);
ctgError("taosHashPut new tbCache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
ctgError("taosHashPut tbmeta to cache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
} else {
taosMemoryFree(pCache->pMeta);
pCache->pMeta = meta;
}
if (NULL == orig) {
......@@ -1262,23 +1291,15 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
ctgdShowTableMeta(pCtg, tbName, meta);
if (!isStb) {
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
return TSDB_CODE_SUCCESS;
}
STableMeta *tbMeta = taosHashGet(tbCache->metaCache, tbName, strlen(tbName));
if (taosHashPut(tbCache->stbCache, &meta->suid, sizeof(meta->suid), &tbMeta, POINTER_BYTES) != 0) {
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
ctgError("taosHashPut stable to stable cache failed, suid:%"PRIx64, meta->suid);
if (origSuid != meta->suid && taosHashPut(dbCache->stbCache, &meta->suid, sizeof(meta->suid), &pCache, POINTER_BYTES) != 0) {
ctgError("taosHashPut to stable cache failed, suid:%"PRIx64, meta->suid);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
CTG_CACHE_STAT_INC(stblNum, 1);
CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
ctgDebug("stb updated to stbCache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
......@@ -1290,6 +1311,43 @@ int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFNam
return TSDB_CODE_SUCCESS;
}
int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *tbName, STableIndex **index) {
if (NULL == dbCache->tbCache) {
taosMemoryFreeClear(*index);
ctgError("db is dropping, dbId:%"PRIx64, dbCache->dbId);
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
}
STableIndex* pIndex = *index;
SCtgTbCache* pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
if (NULL == pCache) {
SCtgTbCache cache = {0};
cache.pIndex = pIndex;
if (taosHashPut(table->tbCache, tbName, strlen(tbName), &cache, sizeof(cache)) != 0) {
taosMemoryFreeClear(*index);
ctgError("taosHashPut new tbCache failed, tbName:%s", tbName);
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
*index = NULL;
ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version, taosArrayGetSize(pIndex->pIndex));
return TSDB_CODE_SUCCESS;
}
if (pCache->pIndex) {
taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
taosMemoryFreeClear(pCache->pIndex);
}
pCache->pIndex = pIndex;
*index = NULL;
ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version, taosArrayGetSize(pIndex->pIndex));
return TSDB_CODE_SUCCESS;
}
int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq) {
STableMetaOutput* pOutput = NULL;
int32_t code = 0;
......@@ -1305,12 +1363,66 @@ _return:
CTG_RET(code);
}
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgUpdateVgMsg *msg = operation->data;
SDBVgInfo* dbInfo = msg->dbInfo;
char* dbFName = msg->dbFName;
CTG_ERR_JRET(ctgWriteDBVgInfoToCache(msg->pCtg, msg->dbFName, msg->dbId, &msg->dbInfo));
if (NULL == dbInfo->vgHash) {
return TSDB_CODE_SUCCESS;
}
if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d",
dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
}
bool newAdded = false;
SDbVgVersion vgVersion = {.dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable};
SCtgDBCache *dbCache = NULL;
CTG_ERR_RET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache));
if (NULL == dbCache) {
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, dbFName, msg->dbId);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
SCtgVgCache *vgCache = &dbCache->vgCache;
CTG_ERR_RET(ctgWLockVgInfo(msg->pCtg, dbCache));
if (vgCache->vgInfo) {
SDBVgInfo *vgInfo = vgCache->vgInfo;
if (dbInfo->vgVersion < vgInfo->vgVersion) {
ctgDebug("db vgVer is old, dbFName:%s, vgVer:%d, curVer:%d", dbFName, dbInfo->vgVersion, vgInfo->vgVersion);
ctgWUnlockVgInfo(dbCache);
return TSDB_CODE_SUCCESS;
}
if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable) {
ctgDebug("no new db vgVer or numOfTable, dbFName:%s, vgVer:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, dbInfo->numOfTable);
ctgWUnlockVgInfo(dbCache);
return TSDB_CODE_SUCCESS;
}
ctgFreeVgInfo(vgInfo);
}
vgCache->vgInfo = dbInfo;
msg->dbInfo = NULL;
ctgDebug("db vgInfo updated, dbFName:%s, vgVer:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId);
ctgWUnlockVgInfo(dbCache);
dbCache = NULL;
strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
CTG_ERR_RET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
_return:
......@@ -1356,14 +1468,14 @@ int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
goto _return;
}
CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache));
CTG_ERR_RET(ctgWLockVgInfo(pCtg, dbCache));
ctgFreeVgInfo(dbCache->vgInfo);
dbCache->vgInfo = NULL;
ctgFreeVgInfo(dbCache->vgCache.vgInfo);
dbCache->vgCache.vgInfo = NULL;
ctgDebug("db vgInfo removed, dbFName:%s", msg->dbFName);
ctgWReleaseVgInfo(dbCache);
ctgWUnlockVgInfo(dbCache);
_return:
......@@ -1375,42 +1487,47 @@ _return:
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgUpdateTblMsg *msg = operation->data;
SCtgUpdateTbMetaMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
STableMetaOutput* output = msg->output;
STableMetaOutput* pMeta = msg->pMeta;
SCtgDBCache *dbCache = NULL;
if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) {
ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", output->dbFName, output->tbName);
if ((!CTG_IS_META_CTABLE(pMeta->metaType)) && NULL == pMeta->tbMeta) {
ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", pMeta->dbFName, pMeta->tbName);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (CTG_IS_META_BOTH(output->metaType) && TSDB_SUPER_TABLE != output->tbMeta->tableType) {
ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType);
if (CTG_IS_META_BOTH(pMeta->metaType) && TSDB_SUPER_TABLE != pMeta->tbMeta->tableType) {
ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, pMeta->tbMeta->tableType);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, output->dbFName, output->dbId, &dbCache));
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pMeta->dbFName, pMeta->dbId, &dbCache));
if (NULL == dbCache) {
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, output->dbFName, output->dbId);
ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%" PRIx64, pMeta->dbFName, pMeta->dbId);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (CTG_IS_META_TABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
int32_t metaSize = CTG_META_SIZE(output->tbMeta);
CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, output->dbFName, output->dbId, output->tbName, output->tbMeta, metaSize));
if (CTG_IS_META_TABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
int32_t metaSize = CTG_META_SIZE(pMeta->tbMeta);
CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize));
pMeta->tbMeta = NULL;
}
if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, output->dbFName, output->dbId, output->ctbName, (STableMeta *)&output->ctbMeta, sizeof(output->ctbMeta)));
if (CTG_IS_META_CTABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
SCTableMeta* ctbMeta = taosMemoryMalloc(sizeof(SCTableMeta));
if (NULL == ctbMeta) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(ctbMeta, &pMeta->ctbMeta, sizeof(SCTableMeta));
CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->ctbName, (STableMeta *)ctbMeta, sizeof(SCTableMeta)));
}
_return:
if (output) {
taosMemoryFreeClear(output->tbMeta);
taosMemoryFreeClear(output);
if (pMeta) {
taosMemoryFreeClear(pMeta->tbMeta);
taosMemoryFreeClear(pMeta);
}
taosMemoryFreeClear(msg);
......@@ -1435,22 +1552,17 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
return TSDB_CODE_SUCCESS;
}
CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
if (taosHashRemove(dbCache->tbCache.stbCache, &msg->suid, sizeof(msg->suid))) {
if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) {
ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
} else {
CTG_CACHE_STAT_DEC(stblNum, 1);
}
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
if (taosHashRemove(dbCache->tbCache.metaCache, msg->stbName, strlen(msg->stbName))) {
if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
} else {
CTG_CACHE_STAT_DEC(tblNum, 1);
}
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
......@@ -1477,21 +1589,18 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
}
if (dbCache->dbId != msg->dbId) {
ctgDebug("dbId already modified, dbFName:%s, current:%"PRIx64", dbId:%"PRIx64", tbName:%s", msg->dbFName, dbCache->dbId, msg->dbId, msg->tbName);
ctgDebug("dbId %" PRIx64 " not match with curId %"PRIx64", dbFName:%s, tbName:%s"msg->dbId, dbCache->dbId, msg->dbFName, msg->tbName);
return TSDB_CODE_SUCCESS;
}
CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
if (taosHashRemove(dbCache->tbCache.metaCache, msg->tbName, strlen(msg->tbName))) {
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
ctgError("stb not exist in cache, dbFName:%s, tbName:%s", msg->dbFName, msg->tbName);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) {
ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} else {
CTG_CACHE_STAT_DEC(tblNum, 1);
}
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
ctgInfo("table removed from cache, dbFName:%s, tbName:%s", msg->dbFName, msg->tbName);
ctgDebug("table %s removed from cache, dbFName:%s", msg->tbName, msg->dbFName);
_return:
......@@ -1553,7 +1662,6 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
_return:
taosHashCleanup(msg->userAuth.createdDbs);
taosHashCleanup(msg->userAuth.readDbs);
taosHashCleanup(msg->userAuth.writeDbs);
......@@ -1569,24 +1677,22 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
SCatalog* pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
CTG_ERR_RET(ctgAcquireDBCache(pCtg, msg->dbFName, &dbCache));
CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
if (NULL == dbCache) {
ctgDebug("db %s not exist, ignore epset update", msg->dbFName);
goto _return;
}
SDBVgInfo *vgInfo = NULL;
CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache));
if (NULL == dbCache->vgInfo) {
ctgWReleaseVgInfo(dbCache);
CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));
SDBVgInfo *vgInfo = dbCache->vgCache.vgInfo;
if (NULL == vgInfo) {
ctgDebug("vgroup in db %s not cached, ignore epset update", msg->dbFName);
goto _return;
}
SVgroupInfo* pInfo = taosHashGet(dbCache->vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId));
SVgroupInfo* pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId));
if (NULL == pInfo) {
ctgWReleaseVgInfo(dbCache);
ctgDebug("no vgroup %d in db %s, ignore epset update", msg->vgId, msg->dbFName);
goto _return;
}
......@@ -1599,12 +1705,10 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
pInfo->epSet = msg->epSet;
ctgWReleaseVgInfo(dbCache);
_return:
if (dbCache) {
ctgReleaseDBCache(msg->pCtg, dbCache);
ctgWUnlockVgInfo(dbCache);
}
taosMemoryFreeClear(msg);
......@@ -1612,6 +1716,32 @@ _return:
CTG_RET(code);
}
int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgUpdateTbIndexMsg *msg = operation->data;
SCatalog* pCtg = msg->pCtg;
STableIndex* pIndex = msg->pIndex;
SCtgDBCache *dbCache = NULL;
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, msg->dbFName, 0, &dbCache));
if (NULL == dbCache) {
CTG_ERR_JRET(code);
}
CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, msg->tbName, &pIndex));
_return:
if (pIndex) {
taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo);
taosMemoryFreeClear(pIndex);
}
taosMemoryFreeClear(msg);
CTG_RET(code);
}
void ctgUpdateThreadUnexpectedStopped(void) {
if (CTG_IS_LOCKED(&gCtgMgmt.lock) > 0) CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
......
......@@ -266,11 +266,11 @@ int32_t ctgdGetStatNum(char *option, void *res) {
}
int32_t ctgdGetTbMetaNum(SCtgDBCache *dbCache) {
return dbCache->tbCache.metaCache ? (int32_t)taosHashGetSize(dbCache->tbCache.metaCache) : 0;
return dbCache->table.tbCache ? (int32_t)taosHashGetSize(dbCache->table.tbCache) : 0;
}
int32_t ctgdGetStbNum(SCtgDBCache *dbCache) {
return dbCache->tbCache.stbCache ? (int32_t)taosHashGetSize(dbCache->tbCache.stbCache) : 0;
return dbCache->table.stbCache ? (int32_t)taosHashGetSize(dbCache->table.stbCache) : 0;
}
int32_t ctgdGetRentNum(SCtgRentMgmt *rent) {
......@@ -363,8 +363,8 @@ void ctgdShowDBCache(SCatalog* pCtg, SHashObj *dbHash) {
dbFName = taosHashGetKey(pIter, &len);
int32_t metaNum = dbCache->tbCache.metaCache ? taosHashGetSize(dbCache->tbCache.metaCache) : 0;
int32_t stbNum = dbCache->tbCache.stbCache ? taosHashGetSize(dbCache->tbCache.stbCache) : 0;
int32_t metaNum = dbCache->table.tbCache ? taosHashGetSize(dbCache->table.tbCache) : 0;
int32_t stbNum = dbCache->table.stbCache ? taosHashGetSize(dbCache->table.stbCache) : 0;
int32_t vgVersion = CTG_DEFAULT_INVALID_VERSION;
int32_t hashMethod = -1;
int32_t vgNum = 0;
......
......@@ -448,10 +448,7 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n
}
if (pTask) {
void* pOut = taosMemoryCalloc(1, POINTER_BYTES);
if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
void* pOut = NULL;
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName));
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
......
......@@ -110,25 +110,39 @@ void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
taosMemoryFreeClear(mgmt->slots);
}
void ctgFreeStbMetaCache(SCtgDBCache *dbCache) {
if (NULL == dbCache->stbCache) {
return;
}
int32_t stblNum = taosHashGetSize(dbCache->stbCache);
taosHashCleanup(dbCache->stbCache);
dbCache->stbCache = NULL;
CTG_CACHE_STAT_DEC(stblNum, stblNum);
}
void ctgFreeTbCacheImpl(SCtgTbCache *pCache) {
taosMemoryFreeClear(pCache->pMeta);
if (pCache->pIndex) {
taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
taosMemoryFreeClear(pCache->pIndex);
}
}
void ctgFreeTbMetaCache(SCtgTbMetaCache *cache) {
CTG_LOCK(CTG_WRITE, &cache->stbLock);
if (cache->stbCache) {
int32_t stblNum = taosHashGetSize(cache->stbCache);
taosHashCleanup(cache->stbCache);
cache->stbCache = NULL;
CTG_CACHE_STAT_DEC(stblNum, stblNum);
void ctgFreeTbCache(SCtgDBCache *dbCache) {
if (NULL == dbCache->tbCache) {
return;
}
CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
CTG_LOCK(CTG_WRITE, &cache->metaLock);
if (cache->metaCache) {
int32_t tblNum = taosHashGetSize(cache->metaCache);
taosHashCleanup(cache->metaCache);
cache->metaCache = NULL;
CTG_CACHE_STAT_DEC(tblNum, tblNum);
int32_t tblNum = taosHashGetSize(dbCache->tbCache);
SCtgTbCache *pCache = taosHashIterate(dbCache->tbCache, NULL);
while (NULL != pCache) {
ctgFreeTbCacheImpl(pCache);
pCache = taosHashIterate(dbCache->tbCache, pCache);
}
CTG_UNLOCK(CTG_WRITE, &cache->metaLock);
taosHashCleanup(dbCache->tbCache);
dbCache->tbCache = NULL;
CTG_CACHE_STAT_DEC(tblNum, tblNum);
}
void ctgFreeVgInfo(SDBVgInfo *vgInfo) {
......@@ -144,16 +158,18 @@ void ctgFreeVgInfo(SDBVgInfo *vgInfo) {
taosMemoryFreeClear(vgInfo);
}
void ctgFreeVgInfoCache(SCtgDBCache *dbCache) {
ctgFreeVgInfo(dbCache->vgCache.vgInfo);
}
void ctgFreeDbCache(SCtgDBCache *dbCache) {
if (NULL == dbCache) {
return;
}
CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
ctgFreeVgInfo (dbCache->vgInfo);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
ctgFreeTbMetaCache(&dbCache->tbCache);
ctgFreeVgInfoCache(dbCache);
ctgFreeStbMetaCache(dbCache);
ctgFreeTbCache(dbCache);
}
......@@ -167,16 +183,13 @@ void ctgFreeHandle(SCatalog* pCtg) {
void *pIter = taosHashIterate(pCtg->dbCache, NULL);
while (pIter) {
SCtgDBCache *dbCache = pIter;
atomic_store_8(&dbCache->deleted, 1);
ctgFreeDbCache(dbCache);
pIter = taosHashIterate(pCtg->dbCache, pIter);
}
taosHashCleanup(pCtg->dbCache);
CTG_CACHE_STAT_DEC(dbNum, dbNum);
}
......@@ -186,14 +199,12 @@ void ctgFreeHandle(SCatalog* pCtg) {
void *pIter = taosHashIterate(pCtg->userCache, NULL);
while (pIter) {
SCtgUserAuth *userCache = pIter;
ctgFreeSCtgUserAuth(userCache);
pIter = taosHashIterate(pCtg->userCache, pIter);
}
taosHashCleanup(pCtg->userCache);
CTG_CACHE_STAT_DEC(userNum, userNum);
}
......@@ -252,9 +263,9 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
break;
}
case TDMT_MND_GET_TABLE_INDEX: {
SArray** pOut = (SArray**)pCtx->out;
STableIndex* pOut = (STableIndex*)pCtx->out;
if (pOut) {
taosArrayDestroyEx(*pOut, tFreeSTableIndexInfo);
taosArrayDestroyEx(pOut->pIndex, tFreeSTableIndexInfo);
taosMemoryFreeClear(pCtx->out);
}
break;
......@@ -640,6 +651,27 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput)
return TSDB_CODE_SUCCESS;
}
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes) {
if (NULL == pIndex) {
*pRes = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t num = taosArrayGetSize(pIndex);
*pRes = taosArrayInit(num, sizeof(STableIndexInfo));
if (NULL == *pRes) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
for (int32_t i = 0; i < num; ++i) {
STableIndexInfo *pInfo = taosArrayGet(pIndex, i);
taosArrayPush(*pRes, pInfo);
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask) {
if (msgType == TDMT_VND_TABLE_META) {
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
......
......@@ -895,9 +895,9 @@ void *ctgTestSetCtableMetaThread(void *param) {
output = (STableMetaOutput *)taosMemoryMalloc(sizeof(STableMetaOutput));
ctgTestBuildCTableMetaOutput(output);
SCtgUpdateTblMsg *msg = (SCtgUpdateTblMsg *)taosMemoryMalloc(sizeof(SCtgUpdateTblMsg));
SCtgUpdateTbMetaMsg *msg = (SCtgUpdateTbMetaMsg *)taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
msg->pCtg = pCtg;
msg->output = output;
msg->pMeta = output;
operation.data = msg;
code = ctgOpUpdateTbMeta(&operation);
......
......@@ -484,13 +484,17 @@ int32_t queryProcessGetTbIndexRsp(void *output, char *msg, int32_t msgSize) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
STableIndexRsp out = {0};
if (tDeserializeSTableIndexRsp(msg, msgSize, &out) != 0) {
STableIndexRsp *out = taosMemoryCalloc(1, sizeof(STableIndexRsp));
if (NULL == out) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (tDeserializeSTableIndexRsp(msg, msgSize, out) != 0) {
qError("tDeserializeSTableIndexRsp failed, msgSize:%d", msgSize);
return TSDB_CODE_INVALID_MSG;
}
*(void **)output = out.pIndex;
*(void **)output = out;
return TSDB_CODE_SUCCESS;
}
......
......@@ -837,7 +837,6 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
SQWorker *mgmt = qwAcquire(refId);
if (NULL == mgmt) {
QW_DLOG("qwAcquire %" PRIx64 "failed", refId);
taosMemoryFree(param);
return;
}
......
......@@ -1476,6 +1476,11 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
for (uint32_t i = 0; i < info->fields[FLD_TYPE_VALUE].num; ++i) {
SFilterField *field = &info->fields[FLD_TYPE_VALUE].fields[i];
if (field->desc) {
if (QUERY_NODE_VALUE != nodeType(field->desc)) {
qDebug("VAL%d => [type:not value node][val:NIL]", i); //TODO
continue;
}
SValueNode *var = (SValueNode *)field->desc;
SDataType *dType = &var->node.resType;
if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) {
......
......@@ -458,6 +458,9 @@ static void taosArrayInsertSort(SArray* pArray, __ext_compar_fn_t fn, const void
}
SArray* taosArrayDeepCopy(const SArray* pSrc, FCopy deepCopy) {
if (NULL == pSrc) {
return NULL;
}
ASSERT(pSrc->elemSize == sizeof(void*));
SArray* pArray = taosArrayInit(pSrc->size, sizeof(void*));
for (int32_t i = 0; i < pSrc->size; i++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册