提交 3573ae15 编写于 作者: M Minghao Li

refactor(sync): block when changing

上级 e441bd26
...@@ -199,7 +199,7 @@ char* syncNode2Str(const SSyncNode* pSyncNode); ...@@ -199,7 +199,7 @@ char* syncNode2Str(const SSyncNode* pSyncNode);
void syncNodeEventLog(const SSyncNode* pSyncNode, char* str); void syncNodeEventLog(const SSyncNode* pSyncNode, char* str);
char* syncNode2SimpleStr(const SSyncNode* pSyncNode); char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config); bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop); void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
SSyncNode* syncNodeAcquire(int64_t rid); SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode); void syncNodeRelease(SSyncNode* pNode);
......
...@@ -1366,146 +1366,188 @@ bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) { ...@@ -1366,146 +1366,188 @@ bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
return b1; return b1;
} }
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex, bool* isDrop) { void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg; SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
pSyncNode->pRaftCfg->cfg = *pNewConfig; pSyncNode->pRaftCfg->cfg = *pNewConfig;
pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex; pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;
int32_t ret = 0; bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
// save snapshot senders bool isDrop = false;
int32_t oldReplicaNum = pSyncNode->replicaNum; bool isAdd = false;
SRaftId oldReplicasId[TSDB_MAX_REPLICA];
memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
oldSenders[i] = (pSyncNode->senders)[i];
char* eventLog = snapshotSender2SimpleStr(oldSenders[i], "snapshot sender save old"); if (IamInOld && !IamInNew) {
syncNodeEventLog(pSyncNode, eventLog); isDrop = true;
taosMemoryFree(eventLog); } else {
isDrop = false;
} }
// init internal if (!IamInOld && IamInNew) {
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; isAdd = true;
syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId); } else {
isAdd = false;
}
// init peersNum, peers, peersId if (IamInNew) {
pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1; pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal
int j = 0;
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
j++;
}
} }
for (int i = 0; i < pSyncNode->peersNum; ++i) { if (isDrop) {
syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]); pSyncNode->pRaftCfg->isStandBy = 1; // set standby
} }
// init replicaNum, replicasId // persist last config index
pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum; raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex);
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
}
syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode); if (IamInNew) {
syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode); //-----------------------------------------
voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode); int32_t ret = 0;
votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
// save snapshot senders
int32_t oldReplicaNum = pSyncNode->replicaNum;
SRaftId oldReplicasId[TSDB_MAX_REPLICA];
memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
oldSenders[i] = (pSyncNode->senders)[i];
pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum); char* eventLog = snapshotSender2SimpleStr(oldSenders[i], "snapshot sender save old");
syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog);
}
// reset snapshot senders // init internal
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
// init peersNum, peers, peersId
pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
int j = 0;
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
if (i != pSyncNode->pRaftCfg->cfg.myIndex) {
pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i];
j++;
}
}
for (int i = 0; i < pSyncNode->peersNum; ++i) {
syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
}
// clear new // init replicaNum, replicasId
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
(pSyncNode->senders)[i] = NULL; for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
} syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
}
// reset new syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode);
for (int i = 0; i < pSyncNode->replicaNum; ++i) { syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode);
// reset sender voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
bool reset = false; votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
for (int j = 0; j < TSDB_MAX_REPLICA; ++j) {
if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
char host[128];
uint16_t port;
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
do { pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
char eventLog[256];
snprintf(eventLog, sizeof(eventLog), "snapshot sender reset for %lu, newIndex:%d, %s:%d, %p",
(pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
syncNodeEventLog(pSyncNode, eventLog);
} while (0);
(pSyncNode->senders)[i] = oldSenders[j]; // reset snapshot senders
oldSenders[j] = NULL;
reset = true;
// reset replicaIndex // clear new
int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex; for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
(pSyncNode->senders)[i]->replicaIndex = i; (pSyncNode->senders)[i] = NULL;
}
do { // reset new
char eventLog[256]; for (int i = 0; i < pSyncNode->replicaNum; ++i) {
snprintf(eventLog, sizeof(eventLog), "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", // reset sender
oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset); bool reset = false;
syncNodeEventLog(pSyncNode, eventLog); for (int j = 0; j < TSDB_MAX_REPLICA; ++j) {
} while (0); if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
char host[128];
uint16_t port;
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
do {
char eventLog[256];
snprintf(eventLog, sizeof(eventLog), "snapshot sender reset for %lu, newIndex:%d, %s:%d, %p",
(pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
syncNodeEventLog(pSyncNode, eventLog);
} while (0);
(pSyncNode->senders)[i] = oldSenders[j];
oldSenders[j] = NULL;
reset = true;
// reset replicaIndex
int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
(pSyncNode->senders)[i]->replicaIndex = i;
do {
char eventLog[256];
snprintf(eventLog, sizeof(eventLog),
"snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex, i, host,
port, (pSyncNode->senders)[i], reset);
syncNodeEventLog(pSyncNode, eventLog);
} while (0);
}
} }
} }
}
// create new // create new
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] == NULL) { if ((pSyncNode->senders)[i] == NULL) {
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
char* eventLog = snapshotSender2SimpleStr((pSyncNode->senders)[i], "snapshot sender create new"); char* eventLog = snapshotSender2SimpleStr((pSyncNode->senders)[i], "snapshot sender create new");
syncNodeEventLog(pSyncNode, eventLog); syncNodeEventLog(pSyncNode, eventLog);
taosMemoryFree(eventLog); taosMemoryFree(eventLog);
}
} }
}
// free old // free old
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (oldSenders[i] != NULL) { if (oldSenders[i] != NULL) {
snapshotSenderDestroy(oldSenders[i]); snapshotSenderDestroy(oldSenders[i]);
do { do {
char eventLog[128]; char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "snapshot sender delete old %p replica-index:%d", oldSenders[i], i); snprintf(eventLog, sizeof(eventLog), "snapshot sender delete old %p replica-index:%d", oldSenders[i], i);
syncNodeEventLog(pSyncNode, eventLog); syncNodeEventLog(pSyncNode, eventLog);
} while (0); } while (0);
oldSenders[i] = NULL; oldSenders[i] = NULL;
}
} }
}
bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig); // persist
bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig); raftCfgPersist(pSyncNode->pRaftCfg);
*isDrop = true; char tmpbuf[512];
if (IamInOld && !IamInNew) { char* oldStr = syncCfg2SimpleStr(&oldConfig);
*isDrop = true; char* newStr = syncCfg2SimpleStr(pNewConfig);
} else { snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%ld, %s --> %s", oldConfig.replicaNum,
*isDrop = false; pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr);
} taosMemoryFree(oldStr);
taosMemoryFree(newStr);
// may be add me to a new raft group // change isStandBy to normal (election timeout)
if (IamInOld && IamInNew && oldConfig.replicaNum == 1) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
} syncNodeBecomeLeader(pSyncNode, tmpbuf);
} else {
syncNodeBecomeFollower(pSyncNode, tmpbuf);
}
} else {
// persist
raftCfgPersist(pSyncNode->pRaftCfg);
if (IamInNew) { char tmpbuf[512];
pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal char* oldStr = syncCfg2SimpleStr(&oldConfig);
char* newStr = syncCfg2SimpleStr(pNewConfig);
snprintf(tmpbuf, sizeof(tmpbuf), "do not config change from %d to %d, index:%ld, %s --> %s", oldConfig.replicaNum,
pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
syncNodeEventLog(pSyncNode, tmpbuf);
} }
raftCfgPersist(pSyncNode->pRaftCfg);
if (gRaftDetailLog) { _END:
syncNodeLog2("==syncNodeDoConfigChange==", pSyncNode); return;
}
} }
SSyncNode* syncNodeAcquire(int64_t rid) { SSyncNode* syncNodeAcquire(int64_t rid) {
...@@ -2275,47 +2317,11 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE ...@@ -2275,47 +2317,11 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
int32_t ret = syncCfgFromStr(pRpcMsg->pCont, &newSyncCfg); int32_t ret = syncCfgFromStr(pRpcMsg->pCont, &newSyncCfg);
ASSERT(ret == 0); ASSERT(ret == 0);
// persist last config index
raftCfgAddConfigIndex(ths->pRaftCfg, pEntry->index);
raftCfgPersist(ths->pRaftCfg);
// update new config myIndex // update new config myIndex
syncNodeUpdateNewConfigIndex(ths, &newSyncCfg); syncNodeUpdateNewConfigIndex(ths, &newSyncCfg);
bool isDrop = false; // do config change
bool IamInNew = syncNodeInConfig(ths, &newSyncCfg); syncNodeDoConfigChange(ths, &newSyncCfg, pEntry->index);
if (IamInNew) {
// do config change
syncNodeDoConfigChange(ths, &newSyncCfg, pEntry->index, &isDrop);
// change isStandBy to normal
if (!isDrop) {
char tmpbuf[512];
char* oldStr = syncCfg2SimpleStr(&oldSyncCfg);
char* newStr = syncCfg2SimpleStr(&newSyncCfg);
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum,
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
if (ths->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(ths, tmpbuf);
} else {
syncNodeBecomeFollower(ths, tmpbuf);
}
}
} else {
char tmpbuf[512];
char* oldStr = syncCfg2SimpleStr(&oldSyncCfg);
char* newStr = syncCfg2SimpleStr(&newSyncCfg);
snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum,
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
syncNodeBecomeFollower(ths, tmpbuf);
}
// set pFinish // set pFinish
pFinish->oldCfg = oldSyncCfg; pFinish->oldCfg = oldSyncCfg;
......
...@@ -539,42 +539,9 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -539,42 +539,9 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// update new config myIndex // update new config myIndex
SSyncCfg newSyncCfg = pMsg->lastConfig; SSyncCfg newSyncCfg = pMsg->lastConfig;
syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg); syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg);
bool IamInNew = syncNodeInConfig(pSyncNode, &newSyncCfg);
// do config change
bool isDrop = false; syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex);
if (IamInNew) {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog),
"update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld", pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex);
syncNodeEventLog(pSyncNode, eventLog);
syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
} else {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog),
"do not update config by snapshot, not in new, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld",
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
syncNodeEventLog(pSyncNode, eventLog);
}
// change isStandBy to normal
if (!isDrop) {
char tmpbuf[512];
char *oldStr = syncCfg2SimpleStr(&oldSyncCfg);
char *newStr = syncCfg2SimpleStr(&newSyncCfg);
snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, index:%ld, %s --> %s",
oldSyncCfg.replicaNum, newSyncCfg.replicaNum, pMsg->lastConfigIndex, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(pSyncNode, tmpbuf);
} else {
syncNodeBecomeFollower(pSyncNode, tmpbuf);
}
}
} }
SSnapshot snapshot; SSnapshot snapshot;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册