diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7630c5f5e5830dbac553d493f9d21a3a7d421f9e..75e819521534d6b30758c0b26374b7adb1ecdabc 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -147,7 +147,7 @@ typedef struct { } SBuildTableMetaInput; typedef struct { - char db[TSDB_TABLE_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; int32_t vgVersion; } SBuildUseDBInput; @@ -746,7 +746,7 @@ typedef struct { typedef struct { char db[TSDB_DB_FNAME_LEN]; - int64_t uid; + uint64_t uid; int32_t vgVersion; int32_t vgNum; int8_t hashMethod; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 70cff7ed1a3624621c6d4794b076bd63d99431d2..9c9e370dcaae1dffb64b29c36a002216d6ffe559 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -59,6 +59,7 @@ typedef struct SSTableMetaVersion { } SSTableMetaVersion; typedef struct SDbVgVersion { + char dbName[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; } SDbVgVersion; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 1925f0e3bda380b375b065b183ccce53cc2f994b..53ef6f4f9ba8088a5d83a2b52ef2f3fefa4259f2 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -82,7 +82,7 @@ typedef struct STableMeta { typedef struct SDBVgroupInfo { SRWLatch lock; - int64_t dbId; + uint64_t dbId; int32_t vgVersion; int8_t hashMethod; SHashObj *vgInfo; //key:vgId, value:SVgroupInfo diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 2197fdfd62757218070e56c947c8486bd6f471d8..d052e045524977c36511fba9e2c98f16b831d65d 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -737,7 +737,7 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm } -int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { +int32_t ctgValidateAndFreeDbInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { SDBVgroupInfo *oldInfo = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName)); if (oldInfo) { CTG_LOCK(CTG_WRITE, &oldInfo->lock); @@ -763,6 +763,48 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SD return TSDB_CODE_SUCCESS; } +int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* target) { + SDBVgroupInfo *info = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName)); + if (info) { + CTG_LOCK(CTG_WRITE, &info->lock); + if (info->dbId != target->dbId) { + ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, target->dbName, info->dbId, target->dbId); + CTG_UNLOCK(CTG_WRITE, &info->lock); + taosHashRelease(pCatalog->dbCache.cache, info); + + return TSDB_CODE_SUCCESS; + } + + if (info->vgVersion > target->vgVersion) { + ctgInfo("db vgVersion already updated, db:%s, version:%d, targetVer:%d", target->dbName, info->vgVersion, target->vgVersion); + CTG_UNLOCK(CTG_WRITE, &info->lock); + taosHashRelease(pCatalog->dbCache.cache, info); + + return TSDB_CODE_SUCCESS; + } + + if (info->vgInfo) { + ctgInfo("cleanup db vgInfo, db:%s", target->dbName); + taosHashCleanup(info->vgInfo); + info->vgInfo = NULL; + } + + if (taosHashRemove(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName))) { + ctgError("taosHashRemove from dbCache failed, db:%s", target->dbName); + CTG_UNLOCK(CTG_WRITE, &info->lock); + taosHashRelease(pCatalog->dbCache.cache, info); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + CTG_UNLOCK(CTG_WRITE, &info->lock); + + taosHashRelease(pCatalog->dbCache.cache, info); + } + + return TSDB_CODE_SUCCESS; +} + + int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) { if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); @@ -1134,19 +1176,6 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - if (dbInfo->vgVersion < 0) { - ctgWarn("db vgVersion less than 0, dbName:%s, vgVersion:%d", dbName, dbInfo->vgVersion); - - if (pCatalog->dbCache.cache) { - CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo)); - - CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName))); - } - - ctgWarn("db removed from cache, db:%s", dbName); - goto _return; - } - if (NULL == pCatalog->dbCache.cache) { SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); if (NULL == cache) { @@ -1158,8 +1187,52 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB taosHashCleanup(cache); } } else { - CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo)); + CTG_ERR_JRET(ctgValidateAndFreeDbInfo(pCatalog, dbName, dbInfo)); + } + + bool newAdded = false; + if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) { + ctgError("taosHashPutExt db vgroup to cache failed, db:%s", dbName); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + dbInfo->vgInfo = NULL; + + SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion}; + if (newAdded) { + CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion))); + } else { + CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare)); + } + + ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, dbInfo->vgVersion); + + +_return: + + if (dbInfo && dbInfo->vgInfo) { + taosHashCleanup(dbInfo->vgInfo); + dbInfo->vgInfo = NULL; } + + CTG_RET(code); +} + + +int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo) { + int32_t code = 0; + + if (NULL == pCatalog || NULL == dbInfo) { + CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); + } + + if (pCatalog->dbCache.cache) { + CTG_ERR_JRET(ctgValidateAndRemoveDbInfo(pCatalog, dbInfo)); + + CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName))); + } + + ctgWarn("db removed from cache, db:%s", dbName); bool newAdded = false; if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) { @@ -1189,6 +1262,7 @@ _return: CTG_RET(code); } + int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index a01c3bcf5de3bce63bff4823cfdc182f3b29fb99..35272c0dceac19584e7fd01b5b999659cf8a00fe 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -59,7 +59,7 @@ int32_t ctgTestTagNum = 1; int32_t ctgTestSVersion = 1; int32_t ctgTestTVersion = 1; int32_t ctgTestSuid = 2; -int64_t ctgTestDbId = 33; +uint64_t ctgTestDbId = 33; uint64_t ctgTestClusterId = 0x1; char *ctgTestDbname = "1.db1";