提交 b6b4563a 编写于 作者: D dapan

feature/scheduler

上级 71fdf76b
......@@ -171,6 +171,7 @@ typedef struct {
char db[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t vgVersion;
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
} SBuildUseDBInput;
typedef struct SField {
......@@ -568,7 +569,7 @@ typedef struct {
char db[TSDB_DB_FNAME_LEN];
int64_t dbId;
int32_t vgVersion;
int32_t numOfTable;
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
} SUseDbReq;
int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
......
......@@ -82,6 +82,7 @@ typedef struct STableMeta {
typedef struct SDBVgInfo {
int32_t vgVersion;
int8_t hashMethod;
int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT
SHashObj *vgHash; //key:vgId, value:SVgroupInfo
} SDBVgInfo;
......@@ -133,7 +134,7 @@ typedef struct SQueryNodeAddr {
} SQueryNodeAddr;
typedef struct SQueryNodeStat {
double tableNum; //table number in million
double tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT
} SQueryNodeStat;
int32_t initTaskQueue();
......
......@@ -224,6 +224,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
SDbVgVersion *db = &dbs[i];
db->dbId = htobe64(db->dbId);
db->vgVersion = htonl(db->vgVersion);
db->numOfTable = htonl(db->numOfTable);
}
SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs};
......
......@@ -1331,7 +1331,7 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD
}
bool newAdded = false;
SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion};
SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable};
SCtgDBCache *dbCache = NULL;
CTG_ERR_RET(ctgGetAddDBCache(pCtg, dbFName, dbId, &dbCache));
......@@ -1344,8 +1344,15 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD
CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache));
if (dbCache->vgInfo) {
if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) {
ctgInfo("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion);
if (dbInfo->vgVersion < dbCache->vgInfo->vgVersion) {
ctgDebug("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion);
ctgWReleaseVgInfo(dbCache);
return TSDB_CODE_SUCCESS;
}
if (dbInfo->vgVersion == dbCache->vgInfo->vgVersion && dbInfo->numOfTable == dbCache->vgInfo->numOfTable) {
ctgDebug("no new db vgVersion or numOfTable, dbFName:%s, vgVersion:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, dbInfo->numOfTable);
ctgWReleaseVgInfo(dbCache);
return TSDB_CODE_SUCCESS;
......@@ -1511,6 +1518,7 @@ int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const
if (inCache) {
input.dbId = (*dbCache)->dbId;
input.vgVersion = (*dbCache)->vgInfo->vgVersion;
input.numOfTable = (*dbCache)->vgInfo->numOfTable;
} else {
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
}
......
......@@ -42,6 +42,7 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
for (int32_t i = 0; i < usedbRsp->vgNum; ++i) {
SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp->pVgroupInfos, i);
pOut->dbVgroup->numOfTable += pVgInfo->numOfTable;
if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
......@@ -84,6 +85,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms
usedbReq.db[sizeof(usedbReq.db) - 1] = 0;
usedbReq.vgVersion = pInput->vgVersion;
usedbReq.dbId = pInput->dbId;
usedbReq.numOfTable = pInput->numOfTable;
int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
void *pBuf = rpcMallocCont(bufLen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册