未验证 提交 a84e2aa4 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4657 from taosdata/feature/wal

TD-2270
...@@ -44,7 +44,7 @@ static void bnUnLock() { ...@@ -44,7 +44,7 @@ static void bnUnLock() {
static bool bnCheckFree(SDnodeObj *pDnode) { static bool bnCheckFree(SDnodeObj *pDnode) {
if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) { if (pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) {
mError("dnode:%d, status:%s not available", pDnode->dnodeId, mnodeGetDnodeStatusStr(pDnode->status)); mError("dnode:%d, status:%s not available", pDnode->dnodeId, dnodeStatus[pDnode->status]);
return false; return false;
} }
...@@ -92,13 +92,12 @@ static void bnDiscardVnode(SVgObj *pVgroup, SVnodeGid *pVnodeGid) { ...@@ -92,13 +92,12 @@ static void bnDiscardVnode(SVgObj *pVgroup, SVnodeGid *pVnodeGid) {
} }
static void bnSwapVnodeGid(SVnodeGid *pVnodeGid1, SVnodeGid *pVnodeGid2) { static void bnSwapVnodeGid(SVnodeGid *pVnodeGid1, SVnodeGid *pVnodeGid2) {
// SVnodeGid tmp = *pVnodeGid1; SVnodeGid tmp = *pVnodeGid1;
// *pVnodeGid1 = *pVnodeGid2; *pVnodeGid1 = *pVnodeGid2;
// *pVnodeGid2 = tmp; *pVnodeGid2 = tmp;
} }
int32_t bnAllocVnodes(SVgObj *pVgroup) { int32_t bnAllocVnodes(SVgObj *pVgroup) {
static int32_t randIndex = 0;
int32_t dnode = 0; int32_t dnode = 0;
int32_t vnodes = 0; int32_t vnodes = 0;
...@@ -120,8 +119,7 @@ int32_t bnAllocVnodes(SVgObj *pVgroup) { ...@@ -120,8 +119,7 @@ int32_t bnAllocVnodes(SVgObj *pVgroup) {
break; break;
} else { } else {
mDebug("dnode:%d, is not selected, status:%s vnodes:%d disk:%fGB role:%d", pDnode->dnodeId, mDebug("dnode:%d, is not selected, status:%s vnodes:%d disk:%fGB role:%d", pDnode->dnodeId,
mnodeGetDnodeStatusStr(pDnode->status), pDnode->openVnodes, pDnode->diskAvailable, dnodeStatus[pDnode->status], pDnode->openVnodes, pDnode->diskAvailable, pDnode->alternativeRole);
pDnode->alternativeRole);
} }
} }
} }
...@@ -137,7 +135,7 @@ int32_t bnAllocVnodes(SVgObj *pVgroup) { ...@@ -137,7 +135,7 @@ int32_t bnAllocVnodes(SVgObj *pVgroup) {
while (1) { while (1) {
pIter = mnodeGetNextDnode(pIter, &pDnode); pIter = mnodeGetNextDnode(pIter, &pDnode);
if (pDnode == NULL) break; if (pDnode == NULL) break;
mDebug("dnode:%d, status:%s vnodes:%d disk:%fGB role:%d", pDnode->dnodeId, mnodeGetDnodeStatusStr(pDnode->status), mDebug("dnode:%d, status:%s vnodes:%d disk:%fGB role:%d", pDnode->dnodeId, dnodeStatus[pDnode->status],
pDnode->openVnodes, pDnode->diskAvailable, pDnode->alternativeRole); pDnode->openVnodes, pDnode->diskAvailable, pDnode->alternativeRole);
mnodeDecDnodeRef(pDnode); mnodeDecDnodeRef(pDnode);
} }
...@@ -149,36 +147,6 @@ int32_t bnAllocVnodes(SVgObj *pVgroup) { ...@@ -149,36 +147,6 @@ int32_t bnAllocVnodes(SVgObj *pVgroup) {
} }
} }
/*
* make the choice more random.
* replica 1: no choice
* replica 2: there are 2 combinations
* replica 3 or larger: there are 6 combinations
*/
if (pVgroup->numOfVnodes == 1) {
} else if (pVgroup->numOfVnodes == 2) {
if (randIndex++ % 2 == 0) {
bnSwapVnodeGid(pVgroup->vnodeGid, pVgroup->vnodeGid + 1);
}
} else {
int32_t randVal = randIndex++ % 6;
if (randVal == 1) { // 1, 0, 2
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1);
} else if (randVal == 2) { // 1, 2, 0
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 1);
bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
} else if (randVal == 3) { // 2, 1, 0
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2);
} else if (randVal == 4) { // 2, 0, 1
bnSwapVnodeGid(pVgroup->vnodeGid + 0, pVgroup->vnodeGid + 2);
bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
}
if (randVal == 5) { // 0, 2, 1
bnSwapVnodeGid(pVgroup->vnodeGid + 1, pVgroup->vnodeGid + 2);
} else {
} // 0, 1, 2
}
bnReleaseDnodes(); bnReleaseDnodes();
bnUnLock(); bnUnLock();
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -214,44 +182,8 @@ static bool bnCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) { ...@@ -214,44 +182,8 @@ static bool bnCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) {
static int32_t bnRemoveVnode(SVgObj *pVgroup) { static int32_t bnRemoveVnode(SVgObj *pVgroup) {
if (pVgroup->numOfVnodes <= 1) return -1; if (pVgroup->numOfVnodes <= 1) return -1;
SVnodeGid *pRmVnode = NULL; SVnodeGid *pSelVnode = &pVgroup->vnodeGid[pVgroup->numOfVnodes - 1];
SVnodeGid *pSelVnode = NULL; mDebug("vgId:%d, vnode in dnode:%d will be dropped", pVgroup->vgId, pSelVnode->dnodeId);
int32_t maxScore = 0;
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
SVnodeGid *pVnode = &(pVgroup->vnodeGid[i]);
SDnodeObj *pDnode = mnodeGetDnode(pVnode->dnodeId);
if (pDnode == NULL) {
mError("vgId:%d, dnode:%d not exist, remove it", pVgroup->vgId, pVnode->dnodeId);
pRmVnode = pVnode;
break;
}
if (pDnode->status == TAOS_DN_STATUS_DROPPING) {
mDebug("vgId:%d, dnode:%d in dropping state", pVgroup->vgId, pVnode->dnodeId);
pRmVnode = pVnode;
} else if (pVnode->dnodeId == pVgroup->lbDnodeId) {
mDebug("vgId:%d, dnode:%d in updating state", pVgroup->vgId, pVnode->dnodeId);
pRmVnode = pVnode;
} else {
if (pSelVnode == NULL) {
pSelVnode = pVnode;
maxScore = pDnode->score;
} else {
if (maxScore < pDnode->score) {
pSelVnode = pVnode;
maxScore = pDnode->score;
}
}
}
mnodeDecDnodeRef(pDnode);
}
if (pRmVnode != NULL) {
pSelVnode = pRmVnode;
}
if (!bnCheckVgroupReady(pVgroup, pSelVnode)) { if (!bnCheckVgroupReady(pVgroup, pSelVnode)) {
mDebug("vgId:%d, is not ready", pVgroup->vgId); mDebug("vgId:%d, is not ready", pVgroup->vgId);
...@@ -275,36 +207,42 @@ static bool bnCheckDnodeInVgroup(SDnodeObj *pDnode, SVgObj *pVgroup) { ...@@ -275,36 +207,42 @@ static bool bnCheckDnodeInVgroup(SDnodeObj *pDnode, SVgObj *pVgroup) {
return false; return false;
} }
/** static SDnodeObj *bnGetAvailDnode(SVgObj *pVgroup) {
* desc: add vnode to vgroup, find a new one if dest dnode is null for (int32_t i = 0; i < tsBnDnodes.size; ++i) {
**/ SDnodeObj *pDnode = tsBnDnodes.list[i];
static int32_t bnAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) { if (bnCheckDnodeInVgroup(pDnode, pVgroup)) continue;
if (pDestDnode == NULL) { if (!bnCheckFree(pDnode)) continue;
for (int32_t i = 0; i < tsBnDnodes.size; ++i) {
SDnodeObj *pDnode = tsBnDnodes.list[i]; mDebug("vgId:%d, add vnode to dnode:%d", pVgroup->vgId, pDnode->dnodeId);
if (pDnode == pSrcDnode) continue; return pDnode;
if (bnCheckDnodeInVgroup(pDnode, pVgroup)) continue;
if (!bnCheckFree(pDnode)) continue;
pDestDnode = pDnode;
mDebug("vgId:%d, add vnode to dnode:%d", pVgroup->vgId, pDnode->dnodeId);
break;
}
} }
if (pDestDnode == NULL) { return NULL;
}
static int32_t bnAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDestDnode) {
if (pDestDnode == NULL || pSrcDnode == pDestDnode) {
return TSDB_CODE_MND_DNODE_NOT_EXIST; return TSDB_CODE_MND_DNODE_NOT_EXIST;
} }
SVnodeGid *pVnodeGid = pVgroup->vnodeGid + pVgroup->numOfVnodes; SVnodeGid vnodeGids[TSDB_MAX_REPLICA];
pVnodeGid->dnodeId = pDestDnode->dnodeId; memcpy(&vnodeGids, &pVgroup->vnodeGid, sizeof(SVnodeGid) * TSDB_MAX_REPLICA);
pVnodeGid->pDnode = pDestDnode;
pVgroup->numOfVnodes++;
if (pSrcDnode != NULL) { int32_t numOfVnodes = pVgroup->numOfVnodes;
pVgroup->lbDnodeId = pSrcDnode->dnodeId; vnodeGids[numOfVnodes].dnodeId = pDestDnode->dnodeId;
vnodeGids[numOfVnodes].pDnode = pDestDnode;
numOfVnodes++;
for (int32_t v = 0; v < numOfVnodes; ++v) {
if (pSrcDnode != NULL && pSrcDnode->dnodeId == vnodeGids[v].dnodeId) {
bnSwapVnodeGid(&vnodeGids[v], &vnodeGids[numOfVnodes - 1]);
pVgroup->lbDnodeId = pSrcDnode->dnodeId;
break;
}
} }
memcpy(&pVgroup->vnodeGid, &vnodeGids, sizeof(SVnodeGid) * TSDB_MAX_REPLICA);
pVgroup->numOfVnodes = numOfVnodes;
atomic_add_fetch_32(&pDestDnode->openVnodes, 1); atomic_add_fetch_32(&pDestDnode->openVnodes, 1);
mnodeUpdateVgroup(pVgroup); mnodeUpdateVgroup(pVgroup);
...@@ -315,16 +253,16 @@ static int32_t bnAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDes ...@@ -315,16 +253,16 @@ static int32_t bnAddVnode(SVgObj *pVgroup, SDnodeObj *pSrcDnode, SDnodeObj *pDes
static bool bnMonitorBalance() { static bool bnMonitorBalance() {
if (tsBnDnodes.size < 2) return false; if (tsBnDnodes.size < 2) return false;
mDebug("monitor dnodes for balance, avail:%d", tsBnDnodes.size);
for (int32_t src = tsBnDnodes.size - 1; src >= 0; --src) { for (int32_t src = tsBnDnodes.size - 1; src >= 0; --src) {
SDnodeObj *pDnode = tsBnDnodes.list[src]; SDnodeObj *pDnode = tsBnDnodes.list[src];
mDebug("%d-dnode:%d, state:%s, score:%.1f, numOfCores:%d, openVnodes:%d", tsBnDnodes.size - src - 1, mDebug("%d-dnode:%d, state:%s, score:%.1f, cores:%d, vnodes:%d", tsBnDnodes.size - src - 1, pDnode->dnodeId,
pDnode->dnodeId, mnodeGetDnodeStatusStr(pDnode->status), pDnode->score, pDnode->numOfCores, dnodeStatus[pDnode->status], pDnode->score, pDnode->numOfCores, pDnode->openVnodes);
pDnode->openVnodes);
} }
float scoresDiff = tsBnDnodes.list[tsBnDnodes.size - 1]->score - tsBnDnodes.list[0]->score; float scoresDiff = tsBnDnodes.list[tsBnDnodes.size - 1]->score - tsBnDnodes.list[0]->score;
if (scoresDiff < 0.01) { if (scoresDiff < 0.01) {
mDebug("all dnodes:%d is already balanced, scoresDiff:%f", tsBnDnodes.size, scoresDiff); mDebug("all dnodes:%d is already balanced, scoreDiff:%.1f", tsBnDnodes.size, scoresDiff);
return false; return false;
} }
...@@ -412,7 +350,13 @@ static int32_t bnMonitorVgroups() { ...@@ -412,7 +350,13 @@ static int32_t bnMonitorVgroups() {
} else if (vgReplica < dbReplica) { } else if (vgReplica < dbReplica) {
mInfo("vgId:%d, replica:%d numOfVnodes:%d, try add one vnode", pVgroup->vgId, dbReplica, vgReplica); mInfo("vgId:%d, replica:%d numOfVnodes:%d, try add one vnode", pVgroup->vgId, dbReplica, vgReplica);
hasUpdatingVgroup = true; hasUpdatingVgroup = true;
code = bnAddVnode(pVgroup, NULL, NULL);
SDnodeObj *pAvailDnode = bnGetAvailDnode(pVgroup);
if (pAvailDnode == NULL) {
code = TSDB_CODE_MND_DNODE_NOT_EXIST;
} else {
code = bnAddVnode(pVgroup, NULL, pAvailDnode);
}
} }
mnodeDecVgroupRef(pVgroup); mnodeDecVgroupRef(pVgroup);
......
...@@ -299,7 +299,7 @@ static int32_t bnRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void ...@@ -299,7 +299,7 @@ static int32_t bnRetrieveScores(SShowObj *pShow, char *data, int32_t rows, void
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, mnodeGetDnodeStatusStr(pDnode->status)); STR_TO_VARSTR(pWrite, dnodeStatus[pDnode->status]);
cols++; cols++;
numOfRows++; numOfRows++;
......
...@@ -129,7 +129,8 @@ static void *dnodeProcessMgmtQueue(void *wparam) { ...@@ -129,7 +129,8 @@ static void *dnodeProcessMgmtQueue(void *wparam) {
static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) { static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
SCreateVnodeMsg *pCreate = rpcMsg->pCont; SCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId); pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.cfgVersion = htonl(pCreate->cfg.cfgVersion); pCreate->cfg.dbCfgVersion = htonl(pCreate->cfg.dbCfgVersion);
pCreate->cfg.vgCfgVersion = htonl(pCreate->cfg.vgCfgVersion);
pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables); pCreate->cfg.maxTables = htonl(pCreate->cfg.maxTables);
pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize); pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize);
pCreate->cfg.totalBlocks = htonl(pCreate->cfg.totalBlocks); pCreate->cfg.totalBlocks = htonl(pCreate->cfg.totalBlocks);
......
...@@ -518,14 +518,15 @@ typedef struct SRetrieveTableRsp { ...@@ -518,14 +518,15 @@ typedef struct SRetrieveTableRsp {
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int32_t cfgVersion; int32_t dbCfgVersion;
int64_t totalStorage; int64_t totalStorage;
int64_t compStorage; int64_t compStorage;
int64_t pointsWritten; int64_t pointsWritten;
uint8_t status; uint8_t status;
uint8_t role; uint8_t role;
uint8_t replica; uint8_t replica;
uint8_t reserved[5]; uint8_t reserved;
int32_t vgCfgVersion;
} SVnodeLoad; } SVnodeLoad;
typedef struct { typedef struct {
...@@ -641,7 +642,7 @@ typedef struct { ...@@ -641,7 +642,7 @@ typedef struct {
typedef struct { typedef struct {
uint32_t vgId; uint32_t vgId;
int32_t cfgVersion; int32_t dbCfgVersion;
int32_t maxTables; int32_t maxTables;
int32_t cacheBlockSize; int32_t cacheBlockSize;
int32_t totalBlocks; int32_t totalBlocks;
...@@ -660,7 +661,8 @@ typedef struct { ...@@ -660,7 +661,8 @@ typedef struct {
int8_t wals; int8_t wals;
int8_t quorum; int8_t quorum;
int8_t update; int8_t update;
int8_t reserved[15]; int8_t reserved[11];
int32_t vgCfgVersion;
} SVnodeCfg; } SVnodeCfg;
typedef struct { typedef struct {
......
...@@ -144,7 +144,8 @@ typedef struct SVgObj { ...@@ -144,7 +144,8 @@ typedef struct SVgObj {
int8_t status; int8_t status;
int8_t reserved0[4]; int8_t reserved0[4];
SVnodeGid vnodeGid[TSDB_MAX_REPLICA]; SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
int8_t reserved1[12]; int32_t vgCfgVersion;
int8_t reserved1[8];
int8_t updateEnd[4]; int8_t updateEnd[4];
int32_t refCount; int32_t refCount;
int32_t numOfTables; int32_t numOfTables;
...@@ -181,7 +182,7 @@ typedef struct SDbObj { ...@@ -181,7 +182,7 @@ typedef struct SDbObj {
int8_t reserved0[4]; int8_t reserved0[4];
char acct[TSDB_USER_LEN]; char acct[TSDB_USER_LEN];
int64_t createdTime; int64_t createdTime;
int32_t cfgVersion; int32_t dbCfgVersion;
SDbCfg cfg; SDbCfg cfg;
int8_t status; int8_t status;
int8_t reserved1[11]; int8_t reserved1[11];
......
...@@ -55,12 +55,12 @@ typedef enum EDnodeOfflineReason { ...@@ -55,12 +55,12 @@ typedef enum EDnodeOfflineReason {
TAOS_DN_OFF_OTHERS TAOS_DN_OFF_OTHERS
} EDnodeOfflineReason; } EDnodeOfflineReason;
extern char* dnodeStatus[];
extern char* dnodeRoles[];
int32_t mnodeInitDnodes(); int32_t mnodeInitDnodes();
void mnodeCleanupDnodes(); void mnodeCleanupDnodes();
char* mnodeGetDnodeStatusStr(int32_t dnodeStatus);
void mgmtMonitorDnodeModule();
int32_t mnodeGetDnodesNum(); int32_t mnodeGetDnodesNum();
int32_t mnodeGetOnlinDnodesCpuCoreNum(); int32_t mnodeGetOnlinDnodesCpuCoreNum();
int32_t mnodeGetOnlineDnodesNum(); int32_t mnodeGetOnlineDnodesNum();
......
...@@ -1015,7 +1015,7 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) { ...@@ -1015,7 +1015,7 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SAlterDbMsg *pAlter, void *pMsg) {
if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) { if (memcmp(&newCfg, &pDb->cfg, sizeof(SDbCfg)) != 0) {
pDb->cfg = newCfg; pDb->cfg = newCfg;
pDb->cfgVersion++; pDb->dbCfgVersion++;
SSdbRow row = { SSdbRow row = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pTable = tsDbSdb, .pTable = tsDbSdb,
......
...@@ -63,7 +63,6 @@ static int32_t mnodeGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC ...@@ -63,7 +63,6 @@ static int32_t mnodeGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static int32_t mnodeGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); static int32_t mnodeGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole);
static void mnodeUpdateDnodeEps(); static void mnodeUpdateDnodeEps();
static char* offlineReason[] = { static char* offlineReason[] = {
...@@ -557,7 +556,8 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) { ...@@ -557,7 +556,8 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
for (int32_t j = 0; j < openVnodes; ++j) { for (int32_t j = 0; j < openVnodes; ++j) {
SVnodeLoad *pVload = &pStatus->load[j]; SVnodeLoad *pVload = &pStatus->load[j];
pVload->vgId = htonl(pVload->vgId); pVload->vgId = htonl(pVload->vgId);
pVload->cfgVersion = htonl(pVload->cfgVersion); pVload->dbCfgVersion = htonl(pVload->dbCfgVersion);
pVload->vgCfgVersion = htonl(pVload->vgCfgVersion);
SVgObj *pVgroup = mnodeGetVgroup(pVload->vgId); SVgObj *pVgroup = mnodeGetVgroup(pVload->vgId);
if (pVgroup == NULL) { if (pVgroup == NULL) {
...@@ -833,12 +833,12 @@ static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, vo ...@@ -833,12 +833,12 @@ static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, vo
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char* status = mnodeGetDnodeStatusStr(pDnode->status); char* status = dnodeStatus[pDnode->status];
STR_TO_VARSTR(pWrite, status); STR_TO_VARSTR(pWrite, status);
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char* role = mnodeGetDnodeAlternativeRoleStr(pDnode->alternativeRole); char* role = dnodeRoles[pDnode->alternativeRole];
STR_TO_VARSTR(pWrite, role); STR_TO_VARSTR(pWrite, role);
cols++; cols++;
...@@ -1154,21 +1154,17 @@ static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, vo ...@@ -1154,21 +1154,17 @@ static int32_t mnodeRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, vo
return numOfRows; return numOfRows;
} }
char* mnodeGetDnodeStatusStr(int32_t dnodeStatus) { char* dnodeStatus[] = {
switch (dnodeStatus) { "offline",
case TAOS_DN_STATUS_OFFLINE: return "offline"; "dropping",
case TAOS_DN_STATUS_DROPPING: return "dropping"; "balancing",
case TAOS_DN_STATUS_BALANCING: return "balancing"; "ready",
case TAOS_DN_STATUS_READY: return "ready"; "undefined"
default: return "undefined"; };
}
}
static char* mnodeGetDnodeAlternativeRoleStr(int32_t alternativeRole) { char* dnodeRoles[] = {
switch (alternativeRole) { "any",
case TAOS_DN_ALTERNATIVE_ROLE_ANY: return "any"; "mnode",
case TAOS_DN_ALTERNATIVE_ROLE_MNODE: return "mnode"; "vnode",
case TAOS_DN_ALTERNATIVE_ROLE_VNODE: return "vnode"; "any"
default:return "any"; };
}
}
...@@ -256,6 +256,8 @@ SVgObj *mnodeGetVgroup(int32_t vgId) { ...@@ -256,6 +256,8 @@ SVgObj *mnodeGetVgroup(int32_t vgId) {
} }
void mnodeUpdateVgroup(SVgObj *pVgroup) { void mnodeUpdateVgroup(SVgObj *pVgroup) {
pVgroup->vgCfgVersion++;
SSdbRow row = { SSdbRow row = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.pTable = tsVgroupSdb, .pTable = tsVgroupSdb,
...@@ -339,10 +341,11 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl ...@@ -339,10 +341,11 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl
pVgroup->pointsWritten = htobe64(pVload->pointsWritten); pVgroup->pointsWritten = htobe64(pVload->pointsWritten);
} }
if (pVload->cfgVersion != pVgroup->pDb->cfgVersion || pVload->replica != pVgroup->numOfVnodes) { if (pVload->dbCfgVersion != pVgroup->pDb->dbCfgVersion || pVload->replica != pVgroup->numOfVnodes ||
mError("dnode:%d, vgId:%d, vnode cfgVersion:%d repica:%d not match with mnode cfgVersion:%d replica:%d", pVload->vgCfgVersion != pVgroup->vgCfgVersion) {
pDnode->dnodeId, pVload->vgId, pVload->cfgVersion, pVload->replica, pVgroup->pDb->cfgVersion, mError("dnode:%d, vgId:%d, vnode cfgVersion:%d:%d repica:%d not match with mnode cfgVersion:%d:%d replica:%d",
pVgroup->numOfVnodes); pDnode->dnodeId, pVload->vgId, pVload->dbCfgVersion, pVload->vgCfgVersion, pVload->replica,
pVgroup->pDb->dbCfgVersion, pVgroup->vgCfgVersion, pVgroup->numOfVnodes);
mnodeSendAlterVgroupMsg(pVgroup); mnodeSendAlterVgroupMsg(pVgroup);
} }
} }
...@@ -840,7 +843,8 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) { ...@@ -840,7 +843,8 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) {
SVnodeCfg *pCfg = &pVnode->cfg; SVnodeCfg *pCfg = &pVnode->cfg;
pCfg->vgId = htonl(pVgroup->vgId); pCfg->vgId = htonl(pVgroup->vgId);
pCfg->cfgVersion = htonl(pDb->cfgVersion); pCfg->dbCfgVersion = htonl(pDb->dbCfgVersion);
pCfg->vgCfgVersion = htonl(pVgroup->vgCfgVersion);
pCfg->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize); pCfg->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize);
pCfg->totalBlocks = htonl(pDb->cfg.totalBlocks); pCfg->totalBlocks = htonl(pDb->cfg.totalBlocks);
pCfg->maxTables = htonl(maxTables + 1); pCfg->maxTables = htonl(maxTables + 1);
......
...@@ -56,7 +56,8 @@ typedef struct { ...@@ -56,7 +56,8 @@ typedef struct {
int64_t sync; int64_t sync;
void * events; void * events;
void * cq; // continuous query void * cq; // continuous query
int32_t cfgVersion; int32_t dbCfgVersion;
int32_t vgCfgVersion;
STsdbCfg tsdbCfg; STsdbCfg tsdbCfg;
SSyncCfg syncCfg; SSyncCfg syncCfg;
SWalCfg walCfg; SWalCfg walCfg;
......
...@@ -22,7 +22,8 @@ ...@@ -22,7 +22,8 @@
static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) { static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) {
tstrncpy(pVnode->db, vnodeMsg->db, sizeof(pVnode->db)); tstrncpy(pVnode->db, vnodeMsg->db, sizeof(pVnode->db));
pVnode->cfgVersion = vnodeMsg->cfg.cfgVersion; pVnode->dbCfgVersion = vnodeMsg->cfg.dbCfgVersion;
pVnode->vgCfgVersion = vnodeMsg->cfg.vgCfgVersion;
pVnode->tsdbCfg.cacheBlockSize = vnodeMsg->cfg.cacheBlockSize; pVnode->tsdbCfg.cacheBlockSize = vnodeMsg->cfg.cacheBlockSize;
pVnode->tsdbCfg.totalBlocks = vnodeMsg->cfg.totalBlocks; pVnode->tsdbCfg.totalBlocks = vnodeMsg->cfg.totalBlocks;
pVnode->tsdbCfg.daysPerFile = vnodeMsg->cfg.daysPerFile; pVnode->tsdbCfg.daysPerFile = vnodeMsg->cfg.daysPerFile;
...@@ -95,12 +96,19 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) { ...@@ -95,12 +96,19 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
} }
tstrncpy(vnodeMsg.db, db->valuestring, sizeof(vnodeMsg.db)); tstrncpy(vnodeMsg.db, db->valuestring, sizeof(vnodeMsg.db));
cJSON *cfgVersion = cJSON_GetObjectItem(root, "cfgVersion"); cJSON *dbCfgVersion = cJSON_GetObjectItem(root, "cfgVersion");
if (!cfgVersion || cfgVersion->type != cJSON_Number) { if (!dbCfgVersion || dbCfgVersion->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, cfgVersion not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, cfgVersion not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR; goto PARSE_VCFG_ERROR;
} }
vnodeMsg.cfg.cfgVersion = cfgVersion->valueint; vnodeMsg.cfg.dbCfgVersion = dbCfgVersion->valueint;
cJSON *vgCfgVersion = cJSON_GetObjectItem(root, "vgCfgVersion");
if (!vgCfgVersion || vgCfgVersion->type != cJSON_Number) {
vError("vgId:%d, failed to read %s, vgCfgVersion not found", pVnode->vgId, file);
goto PARSE_VCFG_ERROR;
}
vnodeMsg.cfg.vgCfgVersion = vgCfgVersion->valueint;
cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize"); cJSON *cacheBlockSize = cJSON_GetObjectItem(root, "cacheBlockSize");
if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) { if (!cacheBlockSize || cacheBlockSize->type != cJSON_Number) {
...@@ -278,7 +286,8 @@ int32_t vnodeWriteCfg(SCreateVnodeMsg *pMsg) { ...@@ -278,7 +286,8 @@ int32_t vnodeWriteCfg(SCreateVnodeMsg *pMsg) {
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"db\": \"%s\",\n", pMsg->db); len += snprintf(content + len, maxLen - len, " \"db\": \"%s\",\n", pMsg->db);
len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pMsg->cfg.cfgVersion); len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pMsg->cfg.dbCfgVersion);
len += snprintf(content + len, maxLen - len, " \"vgCfgVersion\": %d,\n", pMsg->cfg.vgCfgVersion);
len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pMsg->cfg.cacheBlockSize); len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pMsg->cfg.cacheBlockSize);
len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pMsg->cfg.totalBlocks); len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pMsg->cfg.totalBlocks);
len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pMsg->cfg.daysPerFile); len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pMsg->cfg.daysPerFile);
......
...@@ -154,7 +154,7 @@ int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) { ...@@ -154,7 +154,7 @@ int32_t vnodeAlter(void *vparam, SCreateVnodeMsg *pVnodeCfg) {
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS // vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// cfgVersion can be corrected by status msg // dbCfgVersion can be corrected by status msg
if (!vnodeSetUpdatingStatus(pVnode)) { if (!vnodeSetUpdatingStatus(pVnode)) {
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId); vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -134,7 +134,8 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) { ...@@ -134,7 +134,8 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) {
SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++]; SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
pLoad->vgId = htonl(pVnode->vgId); pLoad->vgId = htonl(pVnode->vgId);
pLoad->cfgVersion = htonl(pVnode->cfgVersion); pLoad->dbCfgVersion = htonl(pVnode->dbCfgVersion);
pLoad->vgCfgVersion = htonl(pVnode->vgCfgVersion);
pLoad->totalStorage = htobe64(totalStorage); pLoad->totalStorage = htobe64(totalStorage);
pLoad->compStorage = htobe64(compStorage); pLoad->compStorage = htobe64(compStorage);
pLoad->pointsWritten = htobe64(pointsWritten); pLoad->pointsWritten = htobe64(pointsWritten);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册