未验证 提交 56c5eb29 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #18005 from taosdata/enh/TD-20288

enh: support vgroup epset update
......@@ -190,6 +190,7 @@ typedef struct {
int64_t dbId;
int32_t vgVersion;
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
int64_t stateTs;
} SBuildUseDBInput;
typedef struct SField {
......
......@@ -56,6 +56,7 @@ typedef struct SDbInfo {
int32_t vgVer;
int32_t tbNum;
int64_t dbId;
int64_t stateTs;
} SDbInfo;
typedef struct STablesReq {
......@@ -153,7 +154,7 @@ int32_t catalogInit(SCatalogCfg* cfg);
*/
int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle);
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t* tableNum);
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t* tableNum, int64_t* stateTs);
/**
* Get a DB's all vgroup info.
......
......@@ -127,6 +127,7 @@ typedef struct SDBVgInfo {
int16_t hashSuffix;
int8_t hashMethod;
int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT
int64_t stateTs;
SHashObj* vgHash; // key:vgId, value:SVgroupInfo
} SDBVgInfo;
......
......@@ -61,7 +61,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray);
for (int32_t i = 0; i < numOfBatchs; ++i) {
SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i);
tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->uid);
tscDebug("hb db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->stateTs, rsp->uid);
if (rsp->vgVersion < 0) {
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
......@@ -72,6 +72,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
}
vgInfo->vgVersion = rsp->vgVersion;
vgInfo->stateTs = rsp->stateTs;
vgInfo->hashMethod = rsp->hashMethod;
vgInfo->hashPrefix = rsp->hashPrefix;
vgInfo->hashSuffix = rsp->hashSuffix;
......@@ -486,6 +487,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
db->dbId = htobe64(db->dbId);
db->vgVersion = htonl(db->vgVersion);
db->numOfTable = htonl(db->numOfTable);
db->stateTs = htobe64(db->stateTs);
}
SKv kv = {
......
......@@ -181,7 +181,7 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
struct SCatalog* pCatalog = NULL;
if (usedbRsp.vgVersion >= 0) {
if (usedbRsp.vgVersion >= 0) { // cached in local
uint64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
int32_t code1 = catalogGetHandle(clusterId, &pCatalog);
if (code1 != TSDB_CODE_SUCCESS) {
......
......@@ -1175,7 +1175,7 @@ int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUs
int32_t numOfTable = mndGetDBTableNum(pDb, pMnode);
if (pReq == NULL || pReq->vgVersion < pDb->vgVersion || pReq->dbId != pDb->uid || numOfTable != pReq->numOfTable) {
if (pReq == NULL || pReq->vgVersion < pDb->vgVersion || pReq->dbId != pDb->uid || numOfTable != pReq->numOfTable || pReq->stateTs < pDb->stateTs) {
mndBuildDBVgroupInfo(pDb, pMnode, pRsp->pVgroupInfos);
}
......@@ -1283,12 +1283,31 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
for (int32_t i = 0; i < numOfDbs; ++i) {
SDbVgVersion *pDbVgVersion = &pDbs[i];
pDbVgVersion->dbId = htobe64(pDbVgVersion->dbId);
pDbVgVersion->dbId = be64toh(pDbVgVersion->dbId);
pDbVgVersion->vgVersion = htonl(pDbVgVersion->vgVersion);
pDbVgVersion->numOfTable = htonl(pDbVgVersion->numOfTable);
pDbVgVersion->stateTs = be64toh(pDbVgVersion->stateTs);
SUseDbRsp usedbRsp = {0};
if ((0 == strcasecmp(pDbVgVersion->dbFName, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcasecmp(pDbVgVersion->dbFName, TSDB_PERFORMANCE_SCHEMA_DB)))) {
memcpy(usedbRsp.db, pDbVgVersion->dbFName, TSDB_DB_FNAME_LEN);
int32_t vgVersion = mndGetGlobalVgroupVersion(pMnode);
if (pDbVgVersion->vgVersion < vgVersion) {
usedbRsp.pVgroupInfos = taosArrayInit(10, sizeof(SVgroupInfo));
mndBuildDBVgroupInfo(NULL, pMnode, usedbRsp.pVgroupInfos);
usedbRsp.vgVersion = vgVersion++;
} else {
usedbRsp.vgVersion = pDbVgVersion->vgVersion;
}
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
taosArrayPush(batchUseRsp.pArray, &usedbRsp);
continue;
}
SDbObj *pDb = mndAcquireDb(pMnode, pDbVgVersion->dbFName);
if (pDb == NULL) {
mTrace("db:%s, no exist", pDbVgVersion->dbFName);
......@@ -1301,8 +1320,8 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
int32_t numOfTable = mndGetDBTableNum(pDb, pMnode);
if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable /* &&
pDbVgVersion->stateTs == pDb->stateTs */) {
if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable &&
pDbVgVersion->stateTs == pDb->stateTs) {
mTrace("db:%s, valid dbinfo, vgVersion:%d stateTs:%" PRId64
" numOfTables:%d, not changed vgVersion:%d stateTs:%" PRId64 " numOfTables:%d",
pDbVgVersion->dbFName, pDbVgVersion->vgVersion, pDbVgVersion->stateTs, pDbVgVersion->numOfTable,
......
......@@ -715,10 +715,10 @@ _return:
CTG_API_LEAVE(code);
}
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t* tableNum) {
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t* tableNum, int64_t* pStateTs) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == dbFName || NULL == version || NULL == dbId) {
if (NULL == pCtg || NULL == dbFName || NULL == version || NULL == dbId || NULL == tableNum || NULL == pStateTs) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
......
......@@ -1998,6 +1998,7 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask* pTask) {
pInfo->vgVer = dbCache->vgCache.vgInfo->vgVersion;
pInfo->dbId = dbCache->dbId;
pInfo->tbNum = dbCache->vgCache.vgInfo->numOfTable;
pInfo->stateTs = dbCache->vgCache.vgInfo->stateTs;
ctgReleaseVgInfoToCache(pCtg, dbCache);
dbCache = NULL;
......
......@@ -1231,14 +1231,16 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
CTG_CACHE_STAT_INC(numOfDb, 1);
SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1};
SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1, .stateTs = 0};
tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));
if (!IS_SYS_DBNAME(dbFName)) {
CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));
ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, dbId);
ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, dbId);
}
return TSDB_CODE_SUCCESS;
......@@ -1563,7 +1565,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
}
bool newAdded = false;
SDbVgVersion vgVersion = {.dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable};
SDbVgVersion vgVersion = {.dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable, .stateTs = dbInfo->stateTs};
SCtgDBCache *dbCache = NULL;
CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache));
......@@ -1579,15 +1581,15 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
SDBVgInfo *vgInfo = vgCache->vgInfo;
if (dbInfo->vgVersion < vgInfo->vgVersion) {
ctgDebug("db vgVer is old, dbFName:%s, vgVer:%d, curVer:%d", dbFName, dbInfo->vgVersion, vgInfo->vgVersion);
ctgDebug("db updateVgroup is ignored, dbFName:%s, vgVer:%d, curVer:%d", dbFName, dbInfo->vgVersion, vgInfo->vgVersion);
ctgWUnlockVgInfo(dbCache);
goto _return;
}
if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable) {
ctgDebug("no new db vgVer or numOfTable, dbFName:%s, vgVer:%d, numOfTable:%d", dbFName, dbInfo->vgVersion,
dbInfo->numOfTable);
if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable && dbInfo->stateTs == vgInfo->stateTs) {
ctgDebug("no new db vgroup update info, dbFName:%s, vgVer:%d, numOfTable:%d, stateTs:%" PRId64, dbFName, dbInfo->vgVersion,
dbInfo->numOfTable, dbInfo->stateTs);
ctgWUnlockVgInfo(dbCache);
goto _return;
......@@ -1599,15 +1601,17 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
vgCache->vgInfo = dbInfo;
msg->dbInfo = NULL;
ctgDebug("db vgInfo updated, dbFName:%s, vgVer:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId);
ctgDebug("db vgInfo updated, dbFName:%s, vgVer:%d, stateTs:%" PRId64 ", dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, vgVersion.stateTs, vgVersion.dbId);
ctgWUnlockVgInfo(dbCache);
dbCache = NULL;
tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion),
ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
if (!IS_SYS_DBNAME(dbFName)) {
tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion),
ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
}
_return:
......@@ -2170,7 +2174,6 @@ void *ctgUpdateThreadFunc(void *param) {
CTG_RT_STAT_INC(numOfOpDequeue, 1);
ctgdShowCacheInfo();
ctgdShowClusterCache(pCtg);
}
qInfo("catalog update thread stopped");
......
......@@ -78,7 +78,7 @@ void ctgdUserCallback(SMetaData *pResult, void *param, int32_t code) {
num = taosArrayGetSize(pResult->pDbInfo);
for (int32_t i = 0; i < num; ++i) {
SDbInfo *pDb = taosArrayGet(pResult->pDbInfo, i);
qDebug("db %d dbInfo: vgVer:%d, tbNum:%d, dbId:0x%" PRIx64, i, pDb->vgVer, pDb->tbNum, pDb->dbId);
qDebug("db %d dbInfo: vgVer:%d, tbNum:%d, stateTs:%" PRId64 " dbId:0x%" PRIx64, i, pDb->vgVer, pDb->tbNum, pDb->stateTs, pDb->dbId);
}
} else {
qDebug("empty db info");
......@@ -462,6 +462,7 @@ void ctgdShowDBCache(SCatalog *pCtg, SHashObj *dbHash) {
int32_t hashMethod = -1;
int16_t hashPrefix = 0;
int16_t hashSuffix = 0;
int64_t stateTs = 0;
int32_t vgNum = 0;
if (dbCache->vgCache.vgInfo) {
......@@ -469,16 +470,35 @@ void ctgdShowDBCache(SCatalog *pCtg, SHashObj *dbHash) {
hashMethod = dbCache->vgCache.vgInfo->hashMethod;
hashPrefix = dbCache->vgCache.vgInfo->hashPrefix;
hashSuffix = dbCache->vgCache.vgInfo->hashSuffix;
stateTs = dbCache->vgCache.vgInfo->stateTs;
if (dbCache->vgCache.vgInfo->vgHash) {
vgNum = taosHashGetSize(dbCache->vgCache.vgInfo->vgHash);
}
}
ctgDebug("[%d] db [%.*s][0x%" PRIx64
"] %s: metaNum:%d, stbNum:%d, vgVersion:%d, hashMethod:%d, prefix:%d, suffix:%d, vgNum:%d",
i, (int32_t)len, dbFName, dbCache->dbId, dbCache->deleted ? "deleted" : "", metaNum, stbNum, vgVersion,
"] %s: metaNum:%d, stbNum:%d, vgVersion:%d, stateTs:%" PRId64 ", hashMethod:%d, prefix:%d, suffix:%d, vgNum:%d",
i, (int32_t)len, dbFName, dbCache->dbId, dbCache->deleted ? "deleted" : "", metaNum, stbNum, vgVersion, stateTs,
hashMethod, hashPrefix, hashSuffix, vgNum);
if (dbCache->vgCache.vgInfo) {
int32_t i = 0;
void *pVgIter = taosHashIterate(dbCache->vgCache.vgInfo->vgHash, NULL);
while (pVgIter) {
SVgroupInfo * pVg = (SVgroupInfo *)pVgIter;
ctgDebug("The %04dth VG [id:%d, hashBegin:%u, hashEnd:%u, numOfTable:%d, epNum:%d, inUse:%d]",
i++, pVg->vgId, pVg->hashBegin, pVg->hashEnd, pVg->numOfTable, pVg->epSet.numOfEps, pVg->epSet.inUse);
for (int32_t n = 0; n < pVg->epSet.numOfEps; ++n) {
SEp *pEp = &pVg->epSet.eps[n];
ctgDebug("\tEp %d [fqdn:%s, port:%d]", n, pEp->fqdn, pEp->port);
}
pVgIter = taosHashIterate(dbCache->vgCache.vgInfo->vgHash, pVgIter);
}
}
pIter = taosHashIterate(dbHash, pIter);
}
}
......
......@@ -2486,7 +2486,8 @@ TEST(dbVgroup, getSetDbVgroupCase) {
int32_t dbVer = 0;
int64_t dbId = 0;
int32_t tbNum = 0;
code = catalogGetDBVgVersion(pCtg, ctgTestDbname, &dbVer, &dbId, &tbNum);
int64_t stateTs = 0;
code = catalogGetDBVgVersion(pCtg, ctgTestDbname, &dbVer, &dbId, &tbNum, &stateTs);
ASSERT_EQ(code, 0);
ASSERT_EQ(dbVer, ctgTestVgVersion);
ASSERT_EQ(dbId, ctgTestDbId);
......
......@@ -108,7 +108,7 @@ int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, S
int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo);
int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName, SVgroupInfo* pVgroup);
int32_t getDbVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, int32_t* pVersion, int64_t* pDbId,
int32_t* pTableNum);
int32_t* pTableNum, int64_t* pStateTs);
int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDbCfgInfo* pInfo);
int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, const char* pDbFName, AUTH_TYPE type,
bool* pPass);
......
......@@ -466,14 +466,14 @@ static int32_t getTableHashVgroup(STranslateContext* pCxt, const char* pDbName,
}
static int32_t getDBVgVersion(STranslateContext* pCxt, const char* pDbFName, int32_t* pVersion, int64_t* pDbId,
int32_t* pTableNum) {
int32_t* pTableNum, int64_t* pStateTs) {
SParseContext* pParCxt = pCxt->pParseCxt;
int32_t code = collectUseDatabaseImpl(pDbFName, pCxt->pDbs);
if (TSDB_CODE_SUCCESS == code) {
if (pParCxt->async) {
code = getDbVgVersionFromCache(pCxt->pMetaCache, pDbFName, pVersion, pDbId, pTableNum);
code = getDbVgVersionFromCache(pCxt->pMetaCache, pDbFName, pVersion, pDbId, pTableNum, pStateTs);
} else {
code = catalogGetDBVgVersion(pParCxt->pCatalog, pDbFName, pVersion, pDbId, pTableNum);
code = catalogGetDBVgVersion(pParCxt->pCatalog, pDbFName, pVersion, pDbId, pTableNum, pStateTs);
}
}
if (TSDB_CODE_SUCCESS != code) {
......@@ -4969,7 +4969,7 @@ static int32_t translateUseDatabase(STranslateContext* pCxt, SUseDatabaseStmt* p
SName name = {0};
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
tNameExtractFullName(&name, usedbReq.db);
int32_t code = getDBVgVersion(pCxt, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable);
int32_t code = getDBVgVersion(pCxt, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId, &usedbReq.numOfTable, &usedbReq.stateTs);
if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_USE_DB, (FSerializeFunc)tSerializeSUseDbReq, &usedbReq);
}
......
......@@ -876,13 +876,14 @@ int32_t reserveDbVgVersionInCache(int32_t acctId, const char* pDb, SParseMetaCac
}
int32_t getDbVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, int32_t* pVersion, int64_t* pDbId,
int32_t* pTableNum) {
int32_t* pTableNum, int64_t* pStateTs) {
SDbInfo* pDbInfo = NULL;
int32_t code = getMetaDataFromHash(pDbFName, strlen(pDbFName), pMetaCache->pDbInfo, (void**)&pDbInfo);
if (TSDB_CODE_SUCCESS == code) {
*pVersion = pDbInfo->vgVer;
*pDbId = pDbInfo->dbId;
*pTableNum = pDbInfo->tbNum;
*pStateTs = pDbInfo->stateTs;
}
return code;
}
......
......@@ -249,7 +249,7 @@ int32_t __catalogGetTableDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, con
}
int32_t __catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId,
int32_t* tableNum) {
int32_t* tableNum, int64_t* stateTs) {
return 0;
}
......
......@@ -41,8 +41,9 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
pOut->dbVgroup->hashMethod = usedbRsp->hashMethod;
pOut->dbVgroup->hashPrefix = usedbRsp->hashPrefix;
pOut->dbVgroup->hashSuffix = usedbRsp->hashSuffix;
pOut->dbVgroup->stateTs = usedbRsp->stateTs;
qDebug("Got %d vgroup for db %s", usedbRsp->vgNum, usedbRsp->db);
qDebug("Got %d vgroup for db %s, vgVersion:%d, stateTs:%" PRId64, usedbRsp->vgNum, usedbRsp->db, usedbRsp->vgVersion, usedbRsp->stateTs);
if (usedbRsp->vgNum <= 0) {
return TSDB_CODE_SUCCESS;
......@@ -103,6 +104,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms
usedbReq.vgVersion = pInput->vgVersion;
usedbReq.dbId = pInput->dbId;
usedbReq.numOfTable = pInput->numOfTable;
usedbReq.stateTs = pInput->stateTs;
int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
void *pBuf = (*mallcFp)(bufLen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册