提交 7c46aea9 编写于 作者: D dapan1121

add tversion

上级 f68c5a8f
......@@ -101,6 +101,7 @@ typedef struct SDbVgVersion {
typedef struct STbSVersion {
char* tbFName;
int32_t sver;
int32_t tver;
} STbSVersion;
typedef struct SUserAuthVersion {
......
......@@ -413,7 +413,7 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) {
for (int32_t i = 0; i < tbNum; ++i) {
STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion};
STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion};
taosArrayPush(pArray, &tbSver);
}
}
......
......@@ -1503,6 +1503,7 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
pRsp->precision = pDb->cfg.precision;
pRsp->tableType = TSDB_SUPER_TABLE;
pRsp->sversion = pStb->colVer;
pRsp->tversion = pStb->tagVer;
pRsp->suid = pStb->uid;
pRsp->tuid = pStb->uid;
......@@ -1629,7 +1630,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbVersions, int
metaRsp.suid = pStbVersion->suid;
}
if (pStbVersion->sversion != metaRsp.sversion) {
if (pStbVersion->sversion != metaRsp.sversion || pStbVersion->tversion != metaRsp.tversion) {
taosArrayPush(batchMetaRsp.pArray, &metaRsp);
} else {
tFreeSTableMetaRsp(&metaRsp);
......
......@@ -447,7 +447,7 @@ void ctgReleaseVgInfo(SCtgDBCache *dbCache);
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist);
int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tbType, uint64_t *suid, char *stbName);
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName);
int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass);
int32_t ctgPutRmDBToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId);
int32_t ctgPutRmStbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq);
......
......@@ -812,6 +812,7 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm
SName name;
int32_t sver = 0;
int32_t tver = 0;
int32_t tbNum = taosArrayGetSize(pTables);
for (int32_t i = 0; i < tbNum; ++i) {
STbSVersion* pTb = (STbSVersion*)taosArrayGet(pTables, i);
......@@ -828,8 +829,8 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm
int32_t tbType = 0;
uint64_t suid = 0;
char stbName[TSDB_TABLE_FNAME_LEN];
ctgReadTbSverFromCache(pCtg, &name, &sver, &tbType, &suid, stbName);
if (sver >= 0 && sver < pTb->sver) {
ctgReadTbVerFromCache(pCtg, &name, &sver, &tver, &tbType, &suid, stbName);
if ((sver >= 0 && sver < pTb->sver) || (tver >= 0 && tver < pTb->tver)) {
switch (tbType) {
case TSDB_CHILD_TABLE: {
SName stb = name;
......
......@@ -322,9 +322,10 @@ _return:
CTG_RET(code);
}
int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tbType, uint64_t *suid,
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid,
char *stbName) {
*sver = -1;
*tver = -1;
if (NULL == pCtg->dbCache) {
ctgDebug("empty tbmeta cache, tbName:%s", pTableName->tname);
......@@ -348,6 +349,7 @@ int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t
*suid = tbMeta->suid;
if (*tbType != TSDB_CHILD_TABLE) {
*sver = tbMeta->sversion;
*tver = tbMeta->tversion;
}
}
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
......@@ -359,7 +361,7 @@ int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t
if (*tbType != TSDB_CHILD_TABLE) {
ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tbType, dbFName, pTableName->tname);
ctgDebug("Got sver %d tver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tver, *tbType, dbFName, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
......@@ -391,12 +393,13 @@ int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t
stbName[nameLen] = 0;
*sver = (*stbMeta)->sversion;
*tver = (*stbMeta)->tversion;
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tbType, dbFName, pTableName->tname);
ctgDebug("Got sver %d tver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tver, *tbType, dbFName, pTableName->tname);
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册