提交 be3de7b7 编写于 作者: S Shengliang Guan

fix(sync): send snapshot to multi nodes at the same time

上级 3d8ebc26
......@@ -1294,6 +1294,14 @@ typedef struct {
int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
typedef struct {
int32_t dnodeId;
int8_t standby;
} SSetStandbyReq;
int32_t tSerializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq);
int32_t tDeserializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq);
typedef struct {
int32_t connId;
int32_t queryId;
......
......@@ -97,6 +97,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_MNODE, "create-mnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_MNODE, "alter-mnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_MNODE, "drop-mnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SET_STANDBY, "set-mnode-standby", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_QNODE, "create-qnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_QNODE, "alter-qnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_QNODE, "drop-qnode", NULL, NULL)
......
......@@ -3506,6 +3506,33 @@ int32_t tDeserializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq
return 0;
}
int32_t tSerializeSSetStandbyReq(void *buf, int32_t bufLen, SSetStandbyReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
if (tEncodeI8(&encoder, pReq->standby) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSSetStandbyReq(void *buf, int32_t bufLen, SSetStandbyReq *pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->standby) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
int32_t tSerializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
......
......@@ -155,6 +155,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
......@@ -235,6 +236,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY, mmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
code = 0;
......
......@@ -428,7 +428,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
......@@ -445,7 +444,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
......@@ -454,12 +452,14 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
syncSnapshotRspDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_MND_SET_STANDBY) {
code = syncSetStandby(pMgmt->sync);
SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else {
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
}
} else {
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
......@@ -493,6 +493,10 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_MND_SET_STANDBY) {
code = syncSetStandby(pMgmt->sync);
SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else {
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
......
......@@ -55,6 +55,7 @@ int32_t mndInitMnode(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_SET_STANDBY_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
......@@ -460,6 +461,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
int32_t numOfReplicas = 0;
SDAlterMnodeReq alterReq = {0};
SDDropMnodeReq dropReq = {0};
SSetStandbyReq standbyReq = {0};
SEpSet alterEpset = {0};
SEpSet dropEpSet = {0};
......@@ -494,6 +496,31 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
dropEpSet.eps[0].port = pDnode->port;
memcpy(dropEpSet.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
standbyReq.dnodeId = pDnode->id;
standbyReq.standby = 1;
{
int32_t contLen = tSerializeSSetStandbyReq(NULL, 0, &standbyReq) + sizeof(SMsgHead);
void *pReq = taosMemoryMalloc(contLen);
tSerializeSSetStandbyReq((char*)pReq + sizeof(SMsgHead), contLen, &standbyReq);
SMsgHead *pHead = pReq;
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(MNODE_HANDLE);
STransAction action = {
.epSet = dropEpSet,
.pCont = pReq,
.contLen = contLen,
.msgType = TDMT_MND_SET_STANDBY,
.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED,
};
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
return -1;
}
}
{
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
void *pReq = taosMemoryMalloc(contLen);
......
......@@ -35,7 +35,7 @@
#include "syncVoteMgr.h"
#include "tref.h"
bool gRaftDetailLog = false;
bool gRaftDetailLog = true;
static int32_t tsNodeRefId = -1;
......@@ -152,7 +152,7 @@ int32_t syncSetStandby(int64_t rid) {
return -1;
}
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return -1;
}
......@@ -170,6 +170,7 @@ int32_t syncSetStandby(int64_t rid) {
raftCfgPersist(pSyncNode->pRaftCfg);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
sInfo("vgId:%d, set to standby", pSyncNode->vgId);
return 0;
}
......@@ -1157,6 +1158,13 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
int32_t ret = 0;
// save snapshot senders
int32_t oldReplicaNum = pSyncNode->replicaNum;
SRaftId oldReplicasId[TSDB_MAX_REPLICA];
memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
memcpy(oldSenders, pSyncNode->senders, sizeof(oldSenders));
// init internal
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
......@@ -1187,6 +1195,27 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
// reset snapshot senders, memory leak
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
(pSyncNode->senders)[i] = NULL;
}
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
for (int j = 0; j < TSDB_MAX_REPLICA; ++j) {
if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
char host[128];
uint16_t port;
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
sDebug("vgId:%d sync event reset sender for %lu, %s:%d", pSyncNode->vgId, (pSyncNode->replicasId)[i].addr, host, port);
(pSyncNode->senders)[i] = oldSenders[j];
}
}
}
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] == NULL) {
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
}
}
bool IamInOld = false;
bool IamInNew = false;
for (int i = 0; i < oldConfig.replicaNum; ++i) {
......@@ -1845,7 +1874,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
bool isDrop;
if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) {
//if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) {
if (IamInNew) {
syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop);
// change isStandBy to normal
......@@ -1856,14 +1886,16 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
syncNodeBecomeFollower(ths, "config change");
}
}
if (gRaftDetailLog) {
char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
taosMemoryFree(sOld);
taosMemoryFree(sNew);
}
} else {
syncNodeBecomeFollower(ths, "config change2");
}
if (gRaftDetailLog) {
char* sOld = syncCfg2Str(&oldSyncCfg);
char* sNew = syncCfg2Str(&newSyncCfg);
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d index:%ld \n", sOld, sNew, isDrop, pEntry->index);
taosMemoryFree(sOld);
taosMemoryFree(sNew);
}
// always call FpReConfigCb
......
......@@ -755,6 +755,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// sender receives ack, set seq = ack + 1, send msg from seq
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
// if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
sInfo("recv SyncSnapshotRsp maybe replica already dropped");
return 0;
}
// get sender
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
ASSERT(pSender != NULL);
......@@ -788,4 +794,4 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
}
return 0;
}
\ No newline at end of file
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册