提交 972fee7f 编写于 作者: M Minghao Li

refactor(sync): add rpcMsg to reconfig callback

上级 e9d466ec
......@@ -83,8 +83,10 @@ typedef struct SReConfigCbMeta {
SyncTerm term;
SyncTerm currentTerm;
SSyncCfg oldCfg;
SSyncCfg newCfg;
bool isDrop;
uint64_t flag;
uint64_t seqNum;
} SReConfigCbMeta;
typedef struct SSnapshot {
......@@ -106,7 +108,7 @@ typedef struct SSyncFSM {
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm);
void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta);
void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
......@@ -184,7 +186,6 @@ int64_t syncOpen(const SSyncInfo* pSyncInfo);
void syncStart(int64_t rid);
void syncStop(int64_t rid);
int32_t syncSetStandby(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
ESyncState syncGetMyRole(int64_t rid);
const char* syncGetMyRoleStr(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid);
......@@ -194,8 +195,10 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
bool syncEnvIsStart();
const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid);
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg);
// to be moved to static
void syncStartNormal(int64_t rid);
......
......@@ -96,10 +96,18 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
}
}
void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) {
SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
#if 0
// send response
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pMnode->syncMgmt.sync, cbMeta.seqNum, &rpcMsg.info);
#endif
pMgmt->errCode = cbMeta.code;
mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId,
cbMeta.code, cbMeta.index, cbMeta.term);
......
......@@ -180,10 +180,18 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
return 0;
}
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbMeta) {
SVnode *pVnode = pFsm->data;
vInfo("vgId:%d, sync reconfig is confirmed", TD_VID(pVnode));
#if 0
// send response
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
#endif
// todo rpc response here
// build rpc msg
// put into apply queue
......@@ -212,6 +220,7 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf),
......
......@@ -401,10 +401,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.index = pEntry->index;
cbMeta.term = pEntry->term;
cbMeta.newCfg = newSyncCfg;
cbMeta.oldCfg = oldSyncCfg;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.flag = 0x11;
cbMeta.isDrop = isDrop;
ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta);
ths->pFsm->FpReConfigCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
......
......@@ -175,23 +175,37 @@ int32_t syncSetStandby(int64_t rid) {
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
int32_t ret = 0;
char* configChange = syncCfg2Str((SSyncCfg*)pSyncCfg);
char* newconfig = syncCfg2Str((SSyncCfg*)pSyncCfg);
if (gRaftDetailLog) {
sInfo("==syncReconfig== newconfig:%s", configChange);
sInfo("==syncReconfig== newconfig:%s", newconfig);
}
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_SYNC_CONFIG_CHANGE;
rpcMsg.info.noResp = 1;
rpcMsg.contLen = strlen(configChange) + 1;
rpcMsg.contLen = strlen(newconfig) + 1;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", configChange);
taosMemoryFree(configChange);
snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", newconfig);
taosMemoryFree(newconfig);
ret = syncPropose(rid, &rpcMsg, false);
return ret;
}
int32_t syncReconfigRaw(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) {
int32_t ret = 0;
char* newconfig = syncCfg2Str((SSyncCfg*)pNewCfg);
pRpcMsg->msgType = TDMT_SYNC_CONFIG_CHANGE;
pRpcMsg->info.noResp = 1;
pRpcMsg->contLen = strlen(newconfig) + 1;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
snprintf(pRpcMsg->pCont, pRpcMsg->contLen, "%s", newconfig);
taosMemoryFree(newconfig);
return ret;
}
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = syncPropose(rid, pMsg, isWeak);
return ret;
......@@ -1814,10 +1828,12 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.index = pEntry->index;
cbMeta.term = pEntry->term;
cbMeta.newCfg = newSyncCfg;
cbMeta.oldCfg = oldSyncCfg;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.flag = 0x11;
cbMeta.isDrop = isDrop;
ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta);
ths->pFsm->FpReConfigCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
......
......@@ -146,7 +146,7 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) {
sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu",
cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term);
}
......
......@@ -77,7 +77,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) {
sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu",
cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term);
}
......
......@@ -146,8 +146,8 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb== pFsm:%p", pFsm); }
void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
char* s = syncCfg2Str(&newCfg);
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) {
char* s = syncCfg2Str(&(cbMeta.newCfg));
sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu, newCfg:%s",
cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term, s);
taosMemoryFree(s);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册