提交 77b03ac8 编写于 作者: M Minghao Li

refactor(sync): add restore finish when become leader again

上级 58051169
......@@ -24,7 +24,7 @@ extern "C" {
#include "tdef.h"
#include "tmsgcb.h"
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
typedef uint64_t SyncNodeId;
......
......@@ -221,17 +221,17 @@ void mndCleanupSync(SMnode *pMnode) {
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
rsp.pCont = rpcMallocCont(rsp.contLen);
if (rsp.pCont == NULL) return -1;
memcpy(rsp.pCont, pRaw, rsp.contLen);
SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
req.pCont = rpcMallocCont(req.contLen);
if (req.pCont == NULL) return -1;
memcpy(req.pCont, pRaw, req.contLen);
pMgmt->errCode = 0;
pMgmt->transId = transId;
mTrace("trans:%d, will be proposed", pMgmt->transId);
const bool isWeak = false;
int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak);
int32_t code = syncPropose(pMgmt->sync, &req, isWeak);
if (code == 0) {
tsem_wait(&pMgmt->syncSem);
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
......@@ -242,7 +242,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
terrno = TSDB_CODE_APP_ERROR;
}
rpcFreeCont(rsp.pCont);
rpcFreeCont(req.pCont);
if (code != 0) {
mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
return code;
......
......@@ -195,6 +195,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode);
char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop);
SSyncNode* syncNodeAcquire(int64_t rid);
......@@ -230,6 +231,8 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncInd
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag);
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);
bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId);
SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId);
......
......@@ -185,7 +185,9 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
ASSERT(rid == pSyncNode->rid);
int32_t ret = 0;
bool IamInNew = false;
bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg);
#if 0
for (int i = 0; i < pNewCfg->replicaNum; ++i) {
if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
......@@ -201,6 +203,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
}
*/
}
#endif
if (!IamInNew) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
......@@ -228,7 +231,9 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
}
ASSERT(rid == pSyncNode->rid);
bool IamInNew = false;
bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg);
#if 0
for (int i = 0; i < pNewCfg->replicaNum; ++i) {
if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
......@@ -247,6 +252,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
}
*/
}
#endif
if (!IamInNew) {
sError("sync reconfig error, not in new config");
......@@ -556,7 +562,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
return -1;
}
assert(rid == pSyncNode->rid);
sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
sDebug("vgId:%d sync event propose msgType:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), pMsg->msgType);
ret = syncNodePropose(pSyncNode, pMsg, isWeak);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
......@@ -565,7 +571,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0;
sDebug("vgId:%d sync event propose msgType:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
sDebug("vgId:%d sync event propose msgType:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), pMsg->msgType);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SRespStub stub;
......@@ -1256,9 +1262,36 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
return s;
}
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop) {
bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
bool b1 = false;
bool b2 = false;
for (int i = 0; i < config->replicaNum; ++i) {
if (strcmp((config->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(config->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
b1 = true;
break;
}
}
for (int i = 0; i < config->replicaNum; ++i) {
SRaftId raftId;
raftId.addr = syncUtilAddr2U64((config->nodeInfo)[i].nodeFqdn, (config->nodeInfo)[i].nodePort);
raftId.vgId = pSyncNode->vgId;
if (syncUtilSameId(&raftId, &(pSyncNode->myRaftId))) {
b2 = true;
break;
}
}
ASSERT(b1 == b2);
return b1;
}
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex, bool* isDrop) {
SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
pSyncNode->pRaftCfg->cfg = *newConfig;
pSyncNode->pRaftCfg->cfg = *pNewConfig;
pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;
int32_t ret = 0;
......@@ -1270,7 +1303,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
oldSenders[i] = (pSyncNode->senders)[i];
sDebug("vgId:%d sync event save senders %d, %p", pSyncNode->vgId, i, oldSenders[i]);
sDebug("vgId:%d sync event save senders %d, %p, privateTerm:%lu", pSyncNode->vgId, i, oldSenders[i],
oldSenders[i]->privateTerm);
if (gRaftDetailLog) {
;
}
......@@ -1322,8 +1356,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
char host[128];
uint16_t port;
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
sDebug("vgId:%d sync event reset sender for %lu, newIndex:%d, %s:%d, %p", pSyncNode->vgId,
(pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]);
sDebug("vgId:%d sync event reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu", pSyncNode->vgId,
(pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j], oldSenders[j]->privateTerm);
(pSyncNode->senders)[i] = oldSenders[j];
oldSenders[j] = NULL;
reset = true;
......@@ -1341,7 +1375,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] == NULL) {
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
sDebug("vgId:%d sync event create new sender %p replicaIndex:%d", pSyncNode->vgId, (pSyncNode->senders)[i], i);
sDebug("vgId:%d sync event create new sender %p replicaIndex:%d, privateTerm:%lu", pSyncNode->vgId,
(pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm);
}
}
......@@ -1354,8 +1389,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
}
}
bool IamInOld = false;
bool IamInNew = false;
bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
#if 0
for (int i = 0; i < oldConfig.replicaNum; ++i) {
if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
......@@ -1371,6 +1408,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
break;
}
}
#endif
*isDrop = true;
if (IamInOld && !IamInNew) {
......@@ -1379,6 +1417,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
*isDrop = false;
}
// may be add me to a new raft group
if (IamInOld && IamInNew && oldConfig.replicaNum == 1) {
}
if (IamInNew) {
pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal
}
......@@ -1404,14 +1446,17 @@ void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid)
void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
if (term > pSyncNode->pRaftStore->currentTerm) {
raftStoreSetTerm(pSyncNode->pRaftStore, term);
syncNodeBecomeFollower(pSyncNode, "update term");
char tmpBuf[64];
snprintf(tmpBuf, sizeof(tmpBuf), "update term to %lu", term);
syncNodeBecomeFollower(pSyncNode, tmpBuf);
raftStoreClearVote(pSyncNode->pRaftStore);
}
}
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
sDebug("vgId:%d sync event become follower, isStandBy:%d, replicaNum:%d, %s", pSyncNode->vgId,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, debugStr);
sDebug("vgId:%d sync event become follower, isStandBy:%d, currentTerm:%lu, replicaNum:%d, restoreFinish:%d, %s",
pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftStore->currentTerm, pSyncNode->replicaNum,
pSyncNode->restoreFinish, debugStr);
// maybe clear leader cache
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
......@@ -1445,8 +1490,12 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
sDebug("vgId:%d sync event become leader, isStandBy:%d, replicaNum:%d %s", pSyncNode->vgId,
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, debugStr);
// reset restoreFinish
pSyncNode->restoreFinish = false;
sDebug("vgId:%d sync event become leader, isStandBy:%d, currentTerm:%lu, replicaNum:%d, restoreFinish:%d, %s",
pSyncNode->vgId, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftStore->currentTerm, pSyncNode->replicaNum,
pSyncNode->restoreFinish, debugStr);
// state change
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
......@@ -2028,11 +2077,11 @@ static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
pSyncLeaderTransfer->newLeaderId.addr);
*/
sDebug("vgId:%d sync event, begin leader transfer", ths->vgId);
sDebug("vgId:%d sync event begin leader transfer", ths->vgId);
if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) {
sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId,
sDebug("vgId:%d sync event maybe leader transfer to %s:%d %lu", ths->vgId,
pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort,
pSyncLeaderTransfer->newLeaderId.addr);
......@@ -2067,6 +2116,21 @@ static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
return 0;
}
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
for (int i = 0; i < pNewCfg->replicaNum; ++i) {
SRaftId raftId;
raftId.addr = syncUtilAddr2U64((pNewCfg->nodeInfo)[i].nodeFqdn, (pNewCfg->nodeInfo)[i].nodePort);
raftId.vgId = ths->vgId;
if (syncUtilSameId(&(ths->myRaftId), &raftId)) {
pNewCfg->myIndex = i;
return 0;
}
}
return -1;
}
static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;
......@@ -2075,19 +2139,23 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
ASSERT(ret == 0);
// update new config myIndex
bool IamInNew = false;
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
newSyncCfg.myIndex = i;
IamInNew = true;
break;
}
}
syncNodeUpdateNewConfigIndex(ths, &newSyncCfg);
bool IamInNew = syncNodeInConfig(ths, &newSyncCfg);
/*
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
newSyncCfg.myIndex = i;
IamInNew = true;
break;
}
}
*/
bool isDrop;
// if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) {
if (IamInNew) {
syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop);
......@@ -2186,7 +2254,8 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ths->pFsm->FpRestoreFinishCb(ths->pFsm);
}
ths->restoreFinish = true;
sDebug("vgId:%d sync event restore finish, index:%ld", ths->vgId, pEntry->index);
sDebug("vgId:%d sync event restore finish, %s, index:%ld", ths->vgId, syncUtilState2String(ths->state),
pEntry->index);
}
}
......
......@@ -162,9 +162,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
walFsync(pWal, true);
sDebug("vgId:%d sync event write index:%ld, %s, isStandBy:%d, msgType:%s, originalRpcType:%s", pData->pSyncNode->vgId,
pEntry->index, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy,
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType));
sDebug("vgId:%d sync event write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d",
pData->pSyncNode->vgId, pEntry->index, syncUtilState2String(pData->pSyncNode->state),
pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType), pEntry->msgType,
TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
return code;
}
......
......@@ -45,8 +45,9 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event resp mgr add, type:%s seq:%lu handle:%p", pSyncNode->vgId,
TMSG_INFO(pStub->rpcMsg.msgType), keyCode, pStub->rpcMsg.info.handle);
sDebug("vgId:%d sync event resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId,
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pStub->rpcMsg.info.handle,
pStub->rpcMsg.info.ahandle);
taosThreadMutexUnlock(&(pObj->mutex));
return keyCode;
......@@ -69,8 +70,9 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
memcpy(pStub, pTmp, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event resp mgr get, type:%s seq:%lu handle:%p", pSyncNode->vgId,
TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle);
sDebug("vgId:%d sync event resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId,
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle,
pStub->rpcMsg.info.ahandle);
taosThreadMutexUnlock(&(pObj->mutex));
return 1; // get one object
......@@ -87,8 +89,9 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
memcpy(pStub, pTmp, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event resp mgr get and del, type:%s seq:%lu handle:%p", pSyncNode->vgId,
TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle);
sDebug("vgId:%d sync event resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId,
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle,
pStub->rpcMsg.info.ahandle);
taosHashRemove(pObj->pRespHash, &index, sizeof(index));
taosThreadMutexUnlock(&(pObj->mutex));
......
......@@ -591,6 +591,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
int32_t oldReplicaNum = pSyncNode->replicaNum;
// update new config myIndex
SSyncCfg newSyncCfg = pMsg->lastConfig;
syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg);
bool IamInNew = syncNodeInConfig(pSyncNode, &newSyncCfg);
#if 0
// update new config myIndex
bool IamInNew = false;
SSyncCfg newSyncCfg = pMsg->lastConfig;
......@@ -602,6 +608,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
break;
}
}
#endif
bool isDrop;
if (IamInNew) {
......
......@@ -297,7 +297,7 @@ void usage(char* exe) {
SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
SRpcMsg* pMsg = (SRpcMsg*)taosMemoryMalloc(sizeof(SRpcMsg));
memset(pMsg, 0, sizeof(SRpcMsg));
pMsg->msgType = 9999;
pMsg->msgType = TDMT_VND_SUBMIT;
pMsg->contLen = 256;
pMsg->pCont = rpcMallocCont(pMsg->contLen);
snprintf((char*)(pMsg->pCont), pMsg->contLen, "value-myIndex:%u-%d-%d-%ld", myIndex, i, count, taosGetTimestampMs());
......@@ -384,8 +384,10 @@ int main(int argc, char** argv) {
leaderTransferWait++;
if (leaderTransferWait == 7) {
sTrace("begin leader transfer ...");
int32_t ret = syncLeaderTransfer(rid);
if (leaderTransfer) {
sTrace("begin leader transfer ...");
int32_t ret = syncLeaderTransfer(rid);
}
}
if (alreadySend < writeRecordNum) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册