diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 1b640642d7e3d33ed90402b1926b2302fa7166cb..eeac619105c1ff9bc909833c02d6d4ba2e41352e 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -236,6 +236,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_COMMON_RESPONSE, "sync-common-response", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_APPLY_MSG, "sync-apply-msg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_CONFIG_CHANGE, "sync-config-change", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_CONFIG_CHANGE_FINISH, "sync-config-change-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_SEND, "sync-snapshot-send", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_RSP, "sync-snapshot-rsp", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index dd9fd384cea12cc2d7de37d9bad24a998093d037..e963f25616cc92ebb64a75c9de6daecca50c8116 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -24,6 +24,8 @@ extern "C" { #include "tdef.h" #include "tmsgcb.h" +extern bool gRaftDetailLog; + #define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_INVALID -1 @@ -61,28 +63,35 @@ typedef struct SSyncCfg { } SSyncCfg; typedef struct SFsmCbMeta { + int32_t code; SyncIndex index; + SyncTerm term; + uint64_t seqNum; SyncIndex lastConfigIndex; - bool isWeak; - int32_t code; ESyncState state; - uint64_t seqNum; - SyncTerm term; SyncTerm currentTerm; + bool isWeak; uint64_t flag; } SFsmCbMeta; typedef struct SReConfigCbMeta { - int32_t code; - SyncIndex index; - SyncTerm term; - SyncIndex lastConfigIndex; - SyncTerm currentTerm; + int32_t code; + SyncIndex index; + SyncTerm term; + uint64_t seqNum; + SyncIndex lastConfigIndex; + ESyncState state; + SyncTerm currentTerm; + bool isWeak; + uint64_t flag; + + // config info SSyncCfg oldCfg; SSyncCfg newCfg; - bool isDrop; - uint64_t flag; - uint64_t seqNum; + SyncIndex newCfgIndex; + SyncTerm newCfgTerm; + uint64_t newCfgSeqNum; + } SReConfigCbMeta; typedef struct SSnapshot { @@ -107,8 +116,7 @@ typedef struct SSyncFSM { void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta); void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); - - int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void *pReaderParam, void** ppReader); + int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader); int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader); @@ -189,14 +197,13 @@ ESyncState syncGetMyRole(int64_t rid); bool syncIsReady(int64_t rid); const char* syncGetMyRoleStr(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid); +SyncGroupId syncGetVgId(int64_t rid); void syncGetEpSet(int64_t rid, SEpSet* pEpSet); -int32_t syncGetVgId(int64_t rid); int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); - int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); // build SRpcMsg, need to call syncPropose with SRpcMsg diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index fbf124bf4748e716b28cd4e449f7cfdc5f975a84..37b465e56ea18420cdb4880099c4b2f8bdf4c322 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -489,6 +489,40 @@ void syncLeaderTransferPrint2(char* s, const SyncLeaderTransfer* pMsg); void syncLeaderTransferLog(const SyncLeaderTransfer* pMsg); void syncLeaderTransferLog2(char* s, const SyncLeaderTransfer* pMsg); + +// --------------------------------------------- +typedef struct SyncReconfigFinish { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; + SSyncCfg oldCfg; + SSyncCfg newCfg; + SyncIndex newCfgIndex; + SyncTerm newCfgTerm; + uint64_t newCfgSeqNum; + +} SyncReconfigFinish; + +SyncReconfigFinish* syncReconfigFinishBuild(int32_t vgId); +void syncReconfigFinishDestroy(SyncReconfigFinish* pMsg); +void syncReconfigFinishSerialize(const SyncReconfigFinish* pMsg, char* buf, uint32_t bufLen); +void syncReconfigFinishDeserialize(const char* buf, uint32_t len, SyncReconfigFinish* pMsg); +char* syncReconfigFinishSerialize2(const SyncReconfigFinish* pMsg, uint32_t* len); +SyncReconfigFinish* syncReconfigFinishDeserialize2(const char* buf, uint32_t len); +void syncReconfigFinish2RpcMsg(const SyncReconfigFinish* pMsg, SRpcMsg* pRpcMsg); +void syncReconfigFinishFromRpcMsg(const SRpcMsg* pRpcMsg, SyncReconfigFinish* pMsg); +SyncReconfigFinish* syncReconfigFinishFromRpcMsg2(const SRpcMsg* pRpcMsg); +cJSON* syncReconfigFinish2Json(const SyncReconfigFinish* pMsg); +char* syncReconfigFinish2Str(const SyncReconfigFinish* pMsg); + +// for debug ---------------------- +void syncReconfigFinishPrint(const SyncReconfigFinish* pMsg); +void syncReconfigFinishPrint2(char* s, const SyncReconfigFinish* pMsg); +void syncReconfigFinishLog(const SyncReconfigFinish* pMsg); +void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg); + + + // on message ---------------------- int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 9fb9bcebf7b8ae731b616bc5b151455e248230e7..f4d8c2a663f1f4214434af7e38facd767399e6a1 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -416,6 +416,9 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x090C) #define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x090D) #define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x090E) +#define TSDB_CODE_SYN_NEW_CONFIG_ERROR TAOS_DEF_ERROR_CODE(0, 0x090F) +#define TSDB_CODE_SYN_RECONFIG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0910) +#define TSDB_CODE_SYN_PROPOSE_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0911) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) // tq diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 95b721b4dd651ffd174e1b0a9bf4b15172af7203..b50781e8a585da2be402c6994371978cd00392dc 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -58,21 +58,21 @@ static void *mndBuildTimerMsg(int32_t *pContLen) { static void mndPullupTrans(SMnode *pMnode) { int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } static void mndCalMqRebalance(SMnode *pMnode) { int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); } static void mndPullupTelem(SMnode *pMnode) { int32_t contLen = 0; - void * pReq = mndBuildTimerMsg(&contLen); + void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); } @@ -86,8 +86,8 @@ static void mndPushTtlTime(SMnode *pMnode) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); if (pIter == NULL) break; - int32_t contLen = sizeof(SMsgHead) + sizeof(int32_t); - SMsgHead *pHead = rpcMallocCont(contLen); + int32_t contLen = sizeof(SMsgHead) + sizeof(int32_t); + SMsgHead *pHead = rpcMallocCont(contLen); if (pHead == NULL) { mError("ttl time malloc err. contLen:%d", contLen); sdbRelease(pSdb, pVgroup); @@ -97,13 +97,13 @@ static void mndPushTtlTime(SMnode *pMnode) { pHead->vgId = htonl(pVgroup->vgId); int32_t t = taosGetTimestampSec(); - *(int32_t*)(POINTER_SHIFT(pHead, sizeof(SMsgHead))) = htonl(t); + *(int32_t *)(POINTER_SHIFT(pHead, sizeof(SMsgHead))) = htonl(t); SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen}; - SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup); + SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup); int32_t code = tmsgSendReq(&epSet, &rpcMsg); - if(code != 0){ + if (code != 0) { mError("ttl time seed err. code:%d", code); } mError("ttl time seed succ. time:%d", t); @@ -117,7 +117,7 @@ static void *mndThreadFp(void *param) { setThreadName("mnode-timer"); while (1) { - if (lastTime % (864000) == 0) { // sleep 1 day for ttl + if (lastTime % (864000) == 0) { // sleep 1 day for ttl mndPushTtlTime(pMnode); } @@ -416,7 +416,7 @@ void mndStop(SMnode *pMnode) { } int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { - SMnode * pMnode = pMsg->info.node; + SMnode *pMnode = pMsg->info.node; SSyncMgmt *pMgmt = &pMnode->syncMgmt; int32_t code = 0; @@ -433,15 +433,19 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { return -1; } - char logBuf[512] = {0}; - char *syncNodeStr = sync2SimpleStr(pMgmt->sync); - snprintf(logBuf, sizeof(logBuf), "==mndProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); - static int64_t mndTick = 0; - if (++mndTick % 10 == 1) { - mTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr); - } - syncRpcMsgLog2(logBuf, pMsg); - taosMemoryFree(syncNodeStr); + do { + char *syncNodeStr = sync2SimpleStr(pMgmt->sync); + static int64_t mndTick = 0; + if (++mndTick % 10 == 1) { + mTrace("vgId:%d, sync heartbeat msg:%s, %s", syncGetVgId(pMgmt->sync), TMSG_INFO(pMsg->msgType), syncNodeStr); + } + if (gRaftDetailLog) { + char logBuf[512] = {0}; + snprintf(logBuf, sizeof(logBuf), "==mndProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); + syncRpcMsgLog2(logBuf, pMsg); + } + taosMemoryFree(syncNodeStr); + } while (0); // ToDo: ugly! use function pointer if (syncNodeSnapshotEnable(pSyncNode)) { @@ -580,7 +584,7 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) { } int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { - SMnode * pMnode = pMsg->info.node; + SMnode *pMnode = pMsg->info.node; MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; if (fp == NULL) { mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); @@ -633,7 +637,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr SMonGrantInfo *pGrantInfo) { if (mndAcquireRpcRef(pMnode) != 0) return -1; - SSdb * pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; int64_t ms = taosGetTimestampMs(); pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc)); @@ -709,7 +713,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries; tstrncpy(desc.status, "unsynced", sizeof(desc.status)); for (int32_t i = 0; i < pVgroup->replica; ++i) { - SVnodeGid * pVgid = &pVgroup->vnodeGid[i]; + SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SMonVnodeDesc *pVnDesc = &desc.vnodes[i]; pVnDesc->dnode_id = pVgid->dnodeId; tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role)); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 30e46af03ce0b58789dd089fd845d252d55f269c..de6a44d456c992defcdbae39d97b260b471e43a6 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -22,8 +22,8 @@ #include "mndSync.h" #include "mndUser.h" -#define TRANS_VER_NUMBER 1 -#define TRANS_ARRAY_SIZE 8 +#define TRANS_VER_NUMBER 1 +#define TRANS_ARRAY_SIZE 8 #define TRANS_RESERVE_SIZE 64 static SSdbRaw *mndTransActionEncode(STrans *pTrans); @@ -896,7 +896,7 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) pAction->rawWritten = 0; pAction->msgSent = 0; pAction->msgReceived = 0; - if (pAction->errCode == TSDB_CODE_RPC_REDIRECT || pAction->errCode == TSDB_CODE_SYN_NOT_IN_NEW_CONFIG || + if (pAction->errCode == TSDB_CODE_RPC_REDIRECT || pAction->errCode == TSDB_CODE_SYN_NEW_CONFIG_ERROR || pAction->errCode == TSDB_CODE_SYN_INTERNAL_ERROR || pAction->errCode == TSDB_CODE_SYN_NOT_LEADER) { pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps; mDebug("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage), diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 297a486ee95e12a5fda2d14e7d0d2a4f15201682..f1c43512cec80c744f9a89f983ecc3c985e83d15 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -119,7 +119,7 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { } void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { - SVnode * pVnode = pInfo->ahandle; + SVnode *pVnode = pInfo->ahandle; int32_t vgId = pVnode->config.vgId; int32_t code = 0; SRpcMsg *pMsg = NULL; @@ -174,7 +174,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { } void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { - SVnode * pVnode = pInfo->ahandle; + SVnode *pVnode = pInfo->ahandle; int32_t vgId = pVnode->config.vgId; int32_t code = 0; SRpcMsg *pMsg = NULL; @@ -211,21 +211,23 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); assert(pSyncNode != NULL); - ESyncState state = syncGetMyRole(pVnode->sync); - SyncTerm currentTerm = syncGetMyTerm(pVnode->sync); - SMsgHead *pHead = pMsg->pCont; + STraceId *trace = &pMsg->info.traceId; - char logBuf[512] = {0}; - char *syncNodeStr = sync2SimpleStr(pVnode->sync); - snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr); - static int64_t vndTick = 0; - STraceId * trace = &pMsg->info.traceId; - if (++vndTick % 10 == 1) { - vGTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr); - } - syncRpcMsgLog2(logBuf, pMsg); - taosMemoryFree(syncNodeStr); + do { + char *syncNodeStr = sync2SimpleStr(pVnode->sync); + static int64_t vndTick = 0; + if (++vndTick % 10 == 1) { + vGTrace("vgId:%d, sync heartbeat msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr); + } + if (gRaftDetailLog) { + char logBuf[512] = {0}; + snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, + syncNodeStr); + syncRpcMsgLog2(logBuf, pMsg); + } + taosMemoryFree(syncNodeStr); + } while (0); SRpcMsg *pRpcMsg = pMsg; @@ -348,7 +350,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon } static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - SVnode * pVnode = pFsm->data; + SVnode *pVnode = pFsm->data; SSnapshot snapshot = {0}; SyncIndex beginIndex = SYNC_INDEX_INVALID; char logBuf[256] = {0}; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index dcfea84f369aa804e3899f000bede96ce54c63f1..63db395425c23d2c4ac50fda93b88b974c4787ab 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -159,7 +159,8 @@ typedef struct SSyncNode { SSyncSnapshotSender* senders[TSDB_MAX_REPLICA]; SSyncSnapshotReceiver* pNewNodeReceiver; - // SSnapshotMeta sMeta; + // is config changing + bool changing; } SSyncNode; @@ -198,7 +199,7 @@ char* syncNode2Str(const SSyncNode* pSyncNode); void syncNodeEventLog(const SSyncNode* pSyncNode, char* str); char* syncNode2SimpleStr(const SSyncNode* pSyncNode); bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config); -void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop); +void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex); SSyncNode* syncNodeAcquire(int64_t rid); void syncNodeRelease(SSyncNode* pNode); @@ -238,12 +239,15 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg); bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId); SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId); -int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); -int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); +int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); +int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); void syncStartNormal(int64_t rid); void syncStartStandBy(int64_t rid); +bool syncNodeCanChange(SSyncNode* pSyncNode); +bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg); + // for debug -------------- void syncNodePrint(SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj); diff --git a/source/libs/sync/inc/syncRaftCfg.h b/source/libs/sync/inc/syncRaftCfg.h index 435ad98fb36f0677d8eb4d67585a33de3e2faae9..7f45276e9f2bb5891e96b22650fd3eddd37a338b 100644 --- a/source/libs/sync/inc/syncRaftCfg.h +++ b/source/libs/sync/inc/syncRaftCfg.h @@ -49,14 +49,14 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg); int32_t raftCfgPersist(SRaftCfg *pRaftCfg); int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex); -cJSON *syncCfg2Json(SSyncCfg *pSyncCfg); -char *syncCfg2Str(SSyncCfg *pSyncCfg); -char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg); +cJSON * syncCfg2Json(SSyncCfg *pSyncCfg); +char * syncCfg2Str(SSyncCfg *pSyncCfg); +char * syncCfg2SimpleStr(SSyncCfg *pSyncCfg); int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg); int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg); -cJSON *raftCfg2Json(SRaftCfg *pRaftCfg); -char *raftCfg2Str(SRaftCfg *pRaftCfg); +cJSON * raftCfg2Json(SRaftCfg *pRaftCfg); +char * raftCfg2Str(SRaftCfg *pRaftCfg); int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg); int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg); diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index f6544dd79db4d508d069b7fc80921a0e55ed804c..069154fb93416c3b90840ed81ea8ac2bebf2d8b3 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -66,10 +66,11 @@ char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char typedef struct SSyncSnapshotReceiver { bool start; - int32_t ack; - void *pWriter; - SyncTerm term; - SyncTerm privateTerm; + int32_t ack; + void *pWriter; + SyncTerm term; + SyncTerm privateTerm; + SSnapshot snapshot; SSyncNode *pSyncNode; SRaftId fromId; @@ -78,12 +79,12 @@ typedef struct SSyncSnapshotReceiver { SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId); void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); -void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId); -bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); -void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply); -cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); +void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg); +bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); +void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply); +cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); +char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index e13271341d95a3d9b7178b1551a09caf4b88e94e..599410510de48ad323707c5b0b495fd7b4adbcd2 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -178,6 +178,23 @@ int32_t syncSetStandby(int64_t rid) { return 0; } +bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg) { + bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg); + if (!IamInNew) { + return false; + } + + if (pNewCfg->replicaNum > pSyncNode->replicaNum + 1) { + return false; + } + + if (pNewCfg->replicaNum < pSyncNode->replicaNum - 1) { + return false; + } + + return true; +} + int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { @@ -185,13 +202,12 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg return -1; } ASSERT(rid == pSyncNode->rid); - int32_t ret = 0; - bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg); - if (!IamInNew) { + if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); - terrno = TSDB_CODE_SYN_NOT_IN_NEW_CONFIG; + terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; + sError("syncNodeCheckNewConfig error"); return -1; } @@ -215,12 +231,10 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { } ASSERT(rid == pSyncNode->rid); - bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg); - - if (!IamInNew) { - sError("sync reconfig error, not in new config"); + if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); - terrno = TSDB_CODE_SYN_NOT_IN_NEW_CONFIG; + terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; + sError("syncNodeCheckNewConfig error"); return -1; } @@ -425,28 +439,28 @@ const char* syncGetMyRoleStr(int64_t rid) { return s; } -int32_t syncGetVgId(int64_t rid) { +SyncTerm syncGetMyTerm(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } assert(rid == pSyncNode->rid); - int32_t vgId = pSyncNode->vgId; + SyncTerm term = pSyncNode->pRaftStore->currentTerm; taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return vgId; + return term; } -SyncTerm syncGetMyTerm(int64_t rid) { +SyncGroupId syncGetVgId(int64_t rid) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } assert(rid == pSyncNode->rid); - SyncTerm term = pSyncNode->pRaftStore->currentTerm; + SyncGroupId vgId = pSyncNode->vgId; taosReleaseRef(tsNodeRefId, pSyncNode->rid); - return term; + return vgId; } void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { @@ -589,6 +603,26 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) syncNodeEventLog(pSyncNode, eventLog); if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + if (pSyncNode->changing && pMsg->msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH) { + ret = -1; + terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; + sError("sync propose not ready, type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType); + goto _END; + } + + // config change + if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) { + if (!syncNodeCanChange(pSyncNode)) { + ret = -1; + terrno = TSDB_CODE_SYN_RECONFIG_NOT_READY; + sError("sync reconfig not ready, type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType); + goto _END; + } + + ASSERT(!pSyncNode->changing); + pSyncNode->changing = true; + } + SRespStub stub; stub.createTime = taosGetTimestampMs(); stub.rpcMsg = *pMsg; @@ -606,12 +640,16 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) sError("syncPropose pSyncNode->FpEqMsg is NULL"); } syncClientRequestDestroy(pSyncMsg); + goto _END; + } else { ret = -1; terrno = TSDB_CODE_SYN_NOT_LEADER; sError("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state)); + goto _END; } +_END: return ret; } @@ -825,6 +863,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // snapshot receivers pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID); + // is config changing + pSyncNode->changing = false; + // start in syncNodeStart // start raft // syncNodeBecomeFollower(pSyncNode); @@ -1253,20 +1294,32 @@ char* syncNode2Str(const SSyncNode* pSyncNode) { void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { int32_t userStrLen = strlen(str); + + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; + if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); + } + SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); + if (userStrLen < 256) { char logBuf[128 + 256]; snprintf(logBuf, sizeof(logBuf), - "vgId:%d %s term:%lu commit:%ld standby:%d replica-num:%d lconfig:%ld sync event %s", pSyncNode->vgId, - syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, - pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, str); + "vgId:%d, sync %s %s, term:%lu, commit:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, replica-num:%d, " + "lconfig:%ld, changing:%d", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, + pSyncNode->commitIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy, + pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing); sDebug("%s", logBuf); + } else { int len = 128 + userStrLen; char* s = (char*)taosMemoryMalloc(len); - snprintf(s, len, "vgId:%d %s term:%lu commit:%ld standby:%d replica-num:%d lconfig:%ld sync event %s", - pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, - pSyncNode->commitIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, - pSyncNode->pRaftCfg->lastConfigIndex, str); + snprintf(s, len, + "vgId:%d, sync %s %s, term:%lu, commit:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, replica-num:%d, " + "lconfig:%ld, changing:%d", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, + pSyncNode->commitIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy, + pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing); sDebug("%s", s); taosMemoryFree(s); } @@ -1313,146 +1366,188 @@ bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) { return b1; } -void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex, bool* isDrop) { +void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) { SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg; pSyncNode->pRaftCfg->cfg = *pNewConfig; pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex; - int32_t ret = 0; + bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig); + bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig); - // save snapshot senders - int32_t oldReplicaNum = pSyncNode->replicaNum; - SRaftId oldReplicasId[TSDB_MAX_REPLICA]; - memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId)); - SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; - for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { - oldSenders[i] = (pSyncNode->senders)[i]; + bool isDrop = false; + bool isAdd = false; - char* eventLog = snapshotSender2SimpleStr(oldSenders[i], "snapshot sender save old"); - syncNodeEventLog(pSyncNode, eventLog); - taosMemoryFree(eventLog); + if (IamInOld && !IamInNew) { + isDrop = true; + } else { + isDrop = false; } - // init internal - pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; - syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId); + if (!IamInOld && IamInNew) { + isAdd = true; + } else { + isAdd = false; + } - // init peersNum, peers, peersId - pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1; - int j = 0; - for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { - if (i != pSyncNode->pRaftCfg->cfg.myIndex) { - pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i]; - j++; - } + if (IamInNew) { + pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal } - for (int i = 0; i < pSyncNode->peersNum; ++i) { - syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]); + if (isDrop) { + pSyncNode->pRaftCfg->isStandBy = 1; // set standby } - // init replicaNum, replicasId - pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum; - for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { - syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); - } + // persist last config index + raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex); - syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode); - syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode); - voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode); - votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode); + if (IamInNew) { + //----------------------------------------- + int32_t ret = 0; + + // save snapshot senders + int32_t oldReplicaNum = pSyncNode->replicaNum; + SRaftId oldReplicasId[TSDB_MAX_REPLICA]; + memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId)); + SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; + for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + oldSenders[i] = (pSyncNode->senders)[i]; - pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum); + char* eventLog = snapshotSender2SimpleStr(oldSenders[i], "snapshot sender save old"); + syncNodeEventLog(pSyncNode, eventLog); + taosMemoryFree(eventLog); + } - // reset snapshot senders + // init internal + pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; + syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId); + + // init peersNum, peers, peersId + pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1; + int j = 0; + for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { + if (i != pSyncNode->pRaftCfg->cfg.myIndex) { + pSyncNode->peersNodeInfo[j] = pSyncNode->pRaftCfg->cfg.nodeInfo[i]; + j++; + } + } + for (int i = 0; i < pSyncNode->peersNum; ++i) { + syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]); + } - // clear new - for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { - (pSyncNode->senders)[i] = NULL; - } + // init replicaNum, replicasId + pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum; + for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { + syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]); + } - // reset new - for (int i = 0; i < pSyncNode->replicaNum; ++i) { - // reset sender - bool reset = false; - for (int j = 0; j < TSDB_MAX_REPLICA; ++j) { - if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) { - char host[128]; - uint16_t port; - syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); + syncIndexMgrUpdate(pSyncNode->pNextIndex, pSyncNode); + syncIndexMgrUpdate(pSyncNode->pMatchIndex, pSyncNode); + voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode); + votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode); - do { - char eventLog[256]; - snprintf(eventLog, sizeof(eventLog), "snapshot sender reset for %lu, newIndex:%d, %s:%d, %p", - (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]); - syncNodeEventLog(pSyncNode, eventLog); - } while (0); + pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum); - (pSyncNode->senders)[i] = oldSenders[j]; - oldSenders[j] = NULL; - reset = true; + // reset snapshot senders - // reset replicaIndex - int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex; - (pSyncNode->senders)[i]->replicaIndex = i; + // clear new + for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + (pSyncNode->senders)[i] = NULL; + } - do { - char eventLog[256]; - snprintf(eventLog, sizeof(eventLog), "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", - oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset); - syncNodeEventLog(pSyncNode, eventLog); - } while (0); + // reset new + for (int i = 0; i < pSyncNode->replicaNum; ++i) { + // reset sender + bool reset = false; + for (int j = 0; j < TSDB_MAX_REPLICA; ++j) { + if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) { + char host[128]; + uint16_t port; + syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); + + do { + char eventLog[256]; + snprintf(eventLog, sizeof(eventLog), "snapshot sender reset for %lu, newIndex:%d, %s:%d, %p", + (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j]); + syncNodeEventLog(pSyncNode, eventLog); + } while (0); + + (pSyncNode->senders)[i] = oldSenders[j]; + oldSenders[j] = NULL; + reset = true; + + // reset replicaIndex + int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex; + (pSyncNode->senders)[i]->replicaIndex = i; + + do { + char eventLog[256]; + snprintf(eventLog, sizeof(eventLog), + "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex, i, host, + port, (pSyncNode->senders)[i], reset); + syncNodeEventLog(pSyncNode, eventLog); + } while (0); + } } } - } - // create new - for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { - if ((pSyncNode->senders)[i] == NULL) { - (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); + // create new + for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + if ((pSyncNode->senders)[i] == NULL) { + (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); - char* eventLog = snapshotSender2SimpleStr((pSyncNode->senders)[i], "snapshot sender create new"); - syncNodeEventLog(pSyncNode, eventLog); - taosMemoryFree(eventLog); + char* eventLog = snapshotSender2SimpleStr((pSyncNode->senders)[i], "snapshot sender create new"); + syncNodeEventLog(pSyncNode, eventLog); + taosMemoryFree(eventLog); + } } - } - // free old - for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { - if (oldSenders[i] != NULL) { - snapshotSenderDestroy(oldSenders[i]); + // free old + for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { + if (oldSenders[i] != NULL) { + snapshotSenderDestroy(oldSenders[i]); - do { - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "snapshot sender delete old %p replica-index:%d", oldSenders[i], i); - syncNodeEventLog(pSyncNode, eventLog); - } while (0); + do { + char eventLog[128]; + snprintf(eventLog, sizeof(eventLog), "snapshot sender delete old %p replica-index:%d", oldSenders[i], i); + syncNodeEventLog(pSyncNode, eventLog); + } while (0); - oldSenders[i] = NULL; + oldSenders[i] = NULL; + } } - } - bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig); - bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig); + // persist cfg + raftCfgPersist(pSyncNode->pRaftCfg); - *isDrop = true; - if (IamInOld && !IamInNew) { - *isDrop = true; - } else { - *isDrop = false; - } + char tmpbuf[512]; + char* oldStr = syncCfg2SimpleStr(&oldConfig); + char* newStr = syncCfg2SimpleStr(pNewConfig); + snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%ld, %s --> %s", oldConfig.replicaNum, + pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr); + taosMemoryFree(oldStr); + taosMemoryFree(newStr); - // may be add me to a new raft group - if (IamInOld && IamInNew && oldConfig.replicaNum == 1) { - } + // change isStandBy to normal (election timeout) + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + syncNodeBecomeLeader(pSyncNode, tmpbuf); + } else { + syncNodeBecomeFollower(pSyncNode, tmpbuf); + } + } else { + // persist cfg + raftCfgPersist(pSyncNode->pRaftCfg); - if (IamInNew) { - pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal + char tmpbuf[512]; + char* oldStr = syncCfg2SimpleStr(&oldConfig); + char* newStr = syncCfg2SimpleStr(pNewConfig); + snprintf(tmpbuf, sizeof(tmpbuf), "do not config change from %d to %d, index:%ld, %s --> %s", oldConfig.replicaNum, + pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr); + taosMemoryFree(oldStr); + taosMemoryFree(newStr); + syncNodeEventLog(pSyncNode, tmpbuf); } - raftCfgPersist(pSyncNode->pRaftCfg); - if (gRaftDetailLog) { - syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode); - } +_END: + return; } SSyncNode* syncNodeAcquire(int64_t rid) { @@ -2170,9 +2265,54 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) { return -1; } -static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { +static int32_t syncNodeConfigChangeFinish(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { + SyncReconfigFinish* pFinish = syncReconfigFinishFromRpcMsg2(pRpcMsg); + ASSERT(pFinish); + + if (ths->pFsm->FpReConfigCb != NULL) { + SReConfigCbMeta cbMeta = {0}; + cbMeta.code = 0; + cbMeta.index = pEntry->index; + cbMeta.term = pEntry->term; + cbMeta.seqNum = pEntry->seqNum; + cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index); + cbMeta.state = ths->state; + cbMeta.currentTerm = ths->pRaftStore->currentTerm; + cbMeta.isWeak = pEntry->isWeak; + cbMeta.flag = 0; + + cbMeta.oldCfg = pFinish->oldCfg; + cbMeta.newCfg = pFinish->newCfg; + cbMeta.newCfgIndex = pFinish->newCfgIndex; + cbMeta.newCfgTerm = pFinish->newCfgTerm; + cbMeta.newCfgSeqNum = pFinish->newCfgSeqNum; + + ths->pFsm->FpReConfigCb(ths->pFsm, pRpcMsg, cbMeta); + } + + // update changing + ths->changing = false; + + char tmpbuf[512]; + char* oldStr = syncCfg2SimpleStr(&(pFinish->oldCfg)); + char* newStr = syncCfg2SimpleStr(&(pFinish->newCfg)); + snprintf(tmpbuf, sizeof(tmpbuf), "config change finish from %d to %d, index:%ld, %s --> %s", + pFinish->oldCfg.replicaNum, pFinish->newCfg.replicaNum, pFinish->newCfgIndex, oldStr, newStr); + taosMemoryFree(oldStr); + taosMemoryFree(newStr); + syncNodeEventLog(ths, tmpbuf); + + syncReconfigFinishDestroy(pFinish); + + return 0; +} + +static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry, + SyncReconfigFinish* pFinish) { + // old config SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg; + // new config SSyncCfg newSyncCfg; int32_t ret = syncCfgFromStr(pRpcMsg->pCont, &newSyncCfg); ASSERT(ret == 0); @@ -2180,77 +2320,28 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE // update new config myIndex syncNodeUpdateNewConfigIndex(ths, &newSyncCfg); - bool IamInNew = syncNodeInConfig(ths, &newSyncCfg); + // do config change + syncNodeDoConfigChange(ths, &newSyncCfg, pEntry->index); - /* - for (int i = 0; i < newSyncCfg.replicaNum; ++i) { - if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && - ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) { - newSyncCfg.myIndex = i; - IamInNew = true; - break; - } - } - */ + // set pFinish + pFinish->oldCfg = oldSyncCfg; + pFinish->newCfg = newSyncCfg; + pFinish->newCfgIndex = pEntry->index; + pFinish->newCfgTerm = pEntry->term; + pFinish->newCfgSeqNum = pEntry->seqNum; - bool isDrop; - - if (IamInNew) { - syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop); - - // change isStandBy to normal - if (!isDrop) { - char tmpbuf[512]; - char* oldStr = syncCfg2SimpleStr(&oldSyncCfg); - char* newStr = syncCfg2SimpleStr(&newSyncCfg); - snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum, - newSyncCfg.replicaNum, pEntry->index, oldStr, newStr); - taosMemoryFree(oldStr); - taosMemoryFree(newStr); - - if (ths->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(ths, tmpbuf); - } else { - syncNodeBecomeFollower(ths, tmpbuf); - } - } - } else { - char tmpbuf[512]; - char* oldStr = syncCfg2SimpleStr(&oldSyncCfg); - char* newStr = syncCfg2SimpleStr(&newSyncCfg); - snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum, - newSyncCfg.replicaNum, pEntry->index, oldStr, newStr); - taosMemoryFree(oldStr); - taosMemoryFree(newStr); - - syncNodeBecomeFollower(ths, tmpbuf); - } + return 0; +} - if (gRaftDetailLog) { - char* sOld = syncCfg2Str(&oldSyncCfg); - char* sNew = syncCfg2Str(&newSyncCfg); - sInfo("==config change== 0x11 old:%s new:%s isDrop:%d index:%ld IamInNew:%d \n", sOld, sNew, isDrop, pEntry->index, - IamInNew); - taosMemoryFree(sOld); - taosMemoryFree(sNew); - } +static int32_t syncNodeProposeConfigChangeFinish(SSyncNode* ths, SyncReconfigFinish* pFinish) { + SRpcMsg rpcMsg; + syncReconfigFinish2RpcMsg(pFinish, &rpcMsg); - // always call FpReConfigCb - if (ths->pFsm->FpReConfigCb != NULL) { - SReConfigCbMeta cbMeta = {0}; - cbMeta.code = 0; - cbMeta.currentTerm = ths->pRaftStore->currentTerm; - cbMeta.index = pEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index); - cbMeta.term = pEntry->term; - cbMeta.newCfg = newSyncCfg; - cbMeta.oldCfg = oldSyncCfg; - cbMeta.seqNum = pEntry->seqNum; - cbMeta.flag = 0x11; - cbMeta.isDrop = isDrop; - ths->pFsm->FpReConfigCb(ths->pFsm, pRpcMsg, cbMeta); + int32_t code = syncNodePropose(ths, &rpcMsg, false); + if (code != 0) { + sError("syncNodeProposeConfigChangeFinish error"); + ths->changing = false; } - return 0; } @@ -2292,9 +2383,21 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, // config change if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) { - raftCfgAddConfigIndex(ths->pRaftCfg, pEntry->index); - raftCfgPersist(ths->pRaftCfg); - code = syncNodeConfigChange(ths, &rpcMsg, pEntry); + SyncReconfigFinish* pFinish = syncReconfigFinishBuild(ths->vgId); + ASSERT(pFinish != NULL); + + code = syncNodeConfigChange(ths, &rpcMsg, pEntry, pFinish); + ASSERT(code == 0); + + if (ths->state == TAOS_SYNC_STATE_LEADER) { + syncNodeProposeConfigChangeFinish(ths, pFinish); + } + syncReconfigFinishDestroy(pFinish); + } + + // config change finish + if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE_FINISH) { + code = syncNodeConfigChangeFinish(ths, &rpcMsg, pEntry); ASSERT(code == 0); } @@ -2345,3 +2448,28 @@ SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) } return pSender; } + +bool syncNodeCanChange(SSyncNode* pSyncNode) { + if (pSyncNode->changing) { + sError("sync cannot change"); + return false; + } + + if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) { + SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); + if (pSyncNode->commitIndex != lastIndex) { + sError("sync cannot change2"); + return false; + } + } + + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]); + if (pSender->start) { + sError("sync cannot change3"); + return false; + } + } + + return true; +} \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 23165f67901bfabacf594730d4aef9260b1a1aa0..454609009c8ecd3dfee235c5ea87ecd2a49c0fe2 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -2227,4 +2227,133 @@ void syncLeaderTransferLog2(char* s, const SyncLeaderTransfer* pMsg) { sTrace("syncLeaderTransferLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); taosMemoryFree(serialized); } +} + +// --------------------------------------------- +SyncReconfigFinish* syncReconfigFinishBuild(int32_t vgId) { + uint32_t bytes = sizeof(SyncReconfigFinish); + SyncReconfigFinish* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_CONFIG_CHANGE_FINISH; + return pMsg; +} + +void syncReconfigFinishDestroy(SyncReconfigFinish* pMsg) { + if (pMsg != NULL) { + taosMemoryFree(pMsg); + } +} + +void syncReconfigFinishSerialize(const SyncReconfigFinish* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncReconfigFinishDeserialize(const char* buf, uint32_t len, SyncReconfigFinish* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); +} + +char* syncReconfigFinishSerialize2(const SyncReconfigFinish* pMsg, uint32_t* len) { + char* buf = taosMemoryMalloc(pMsg->bytes); + assert(buf != NULL); + syncReconfigFinishSerialize(pMsg, buf, pMsg->bytes); + if (len != NULL) { + *len = pMsg->bytes; + } + return buf; +} + +SyncReconfigFinish* syncReconfigFinishDeserialize2(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SyncReconfigFinish* pMsg = taosMemoryMalloc(bytes); + assert(pMsg != NULL); + syncReconfigFinishDeserialize(buf, len, pMsg); + assert(len == pMsg->bytes); + return pMsg; +} + +void syncReconfigFinish2RpcMsg(const SyncReconfigFinish* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncReconfigFinishSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncReconfigFinishFromRpcMsg(const SRpcMsg* pRpcMsg, SyncReconfigFinish* pMsg) { + syncReconfigFinishDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +SyncReconfigFinish* syncReconfigFinishFromRpcMsg2(const SRpcMsg* pRpcMsg) { + SyncReconfigFinish* pMsg = syncReconfigFinishDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + assert(pMsg != NULL); + return pMsg; +} + +cJSON* syncReconfigFinish2Json(const SyncReconfigFinish* pMsg) { + char u64buf[128]; + cJSON* pRoot = cJSON_CreateObject(); + + if (pMsg != NULL) { + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pOldCfg = syncCfg2Json((SSyncCfg*)(&(pMsg->oldCfg))); + cJSON* pNewCfg = syncCfg2Json((SSyncCfg*)(&(pMsg->newCfg))); + cJSON_AddItemToObject(pRoot, "oldCfg", pOldCfg); + cJSON_AddItemToObject(pRoot, "newCfg", pNewCfg); + + snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->newCfgIndex); + cJSON_AddStringToObject(pRoot, "newCfgIndex", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->newCfgTerm); + cJSON_AddStringToObject(pRoot, "newCfgTerm", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->newCfgSeqNum); + cJSON_AddStringToObject(pRoot, "newCfgSeqNum", u64buf); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncReconfigFinish", pRoot); + return pJson; +} + +char* syncReconfigFinish2Str(const SyncReconfigFinish* pMsg) { + cJSON* pJson = syncReconfigFinish2Json(pMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug ---------------------- +void syncReconfigFinishPrint(const SyncReconfigFinish* pMsg) { + char* serialized = syncReconfigFinish2Str(pMsg); + printf("syncReconfigFinishPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncReconfigFinishPrint2(char* s, const SyncReconfigFinish* pMsg) { + char* serialized = syncReconfigFinish2Str(pMsg); + printf("syncReconfigFinishPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void syncReconfigFinishLog(const SyncReconfigFinish* pMsg) { + char* serialized = syncReconfigFinish2Str(pMsg); + sTrace("syncReconfigFinishLog | len:%lu | %s", strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) { + if (gRaftDetailLog) { + char* serialized = syncReconfigFinish2Str(pMsg); + sTrace("syncReconfigFinishLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + taosMemoryFree(serialized); + } } \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index a8242d06e39cbcb30a175cb955454ee56292cb2e..08c3e0126c143acb4196d9196ed8ee09f6678f43 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -96,14 +96,14 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) { char *syncCfg2Str(SSyncCfg *pSyncCfg) { cJSON *pJson = syncCfg2Json(pSyncCfg); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { int32_t len = 512; - char *s = taosMemoryMalloc(len); + char * s = taosMemoryMalloc(len); memset(s, 0, len); snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); @@ -196,7 +196,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) { cJSON *pJson = raftCfg2Json(pRaftCfg); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -262,7 +262,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); } - cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); + cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); ASSERT(code == 0); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 029891a692f5112aa47f9da58fc63c935f61294d..c694a0b71539c410dd20c06c3085b2c10aebcbc0 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -21,7 +21,8 @@ #include "syncUtil.h" #include "wal.h" -static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId); +static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, + SyncSnapshotSend *pBeginMsg); //---------------------------------- SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { @@ -341,6 +342,10 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from pReceiver->fromId = fromId; pReceiver->term = pSyncNode->pRaftStore->currentTerm; pReceiver->privateTerm = 0; + pReceiver->snapshot.data = NULL; + pReceiver->snapshot.lastApplyIndex = -1; + pReceiver->snapshot.lastApplyTerm = 0; + pReceiver->snapshot.lastConfigIndex = -1; } else { sInfo("snapshotReceiverCreate cannot create receiver"); @@ -358,11 +363,16 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; } // begin receive snapshot msg (current term, seq begin) -static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) { +static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, + SyncSnapshotSend *pBeginMsg) { pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm; pReceiver->privateTerm = privateTerm; pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; - pReceiver->fromId = fromId; + pReceiver->fromId = pBeginMsg->srcId; + + pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex; + pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm; + pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex; ASSERT(pReceiver->pWriter == NULL); int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter)); @@ -371,10 +381,10 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p // if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver // if already start, force close, start again -void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) { +void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) { if (!snapshotReceiverIsStart(pReceiver)) { // start - snapshotReceiverDoStart(pReceiver, privateTerm, fromId); + snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg); pReceiver->start = true; } else { @@ -388,7 +398,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTer pReceiver->pWriter = NULL; // start again - snapshotReceiverDoStart(pReceiver, privateTerm, fromId); + snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg); pReceiver->start = true; } @@ -449,6 +459,15 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId); cJSON_AddItemToObject(pRoot, "fromId", pFromId); + snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->snapshot.lastApplyIndex); + cJSON_AddStringToObject(pRoot, "snapshot.lastApplyIndex", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->snapshot.lastApplyTerm); + cJSON_AddStringToObject(pRoot, "snapshot.lastApplyTerm", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->snapshot.lastConfigIndex); + cJSON_AddStringToObject(pRoot, "snapshot.lastConfigIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term); cJSON_AddStringToObject(pRoot, "term", u64buf); @@ -477,8 +496,9 @@ char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) uint16_t port; syncUtilU642Addr(fromId.addr, host, sizeof(host), &port); - snprintf(s, len, "%s %p start:%d ack:%d term:%lu pterm:%lu %s:%d ", event, pReceiver, pReceiver->start, - pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port); + snprintf(s, len, "%s %p start:%d ack:%d term:%lu pterm:%lu from:%s:%d laindex:%ld laterm:%lu lcindex:%ld", event, + pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port, + pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex); return s; } @@ -495,7 +515,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { // begin - snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg->srcId); + snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg); pReceiver->ack = pMsg->seq; needRsp = true; @@ -519,42 +539,9 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { // update new config myIndex SSyncCfg newSyncCfg = pMsg->lastConfig; syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg); - bool IamInNew = syncNodeInConfig(pSyncNode, &newSyncCfg); - - bool isDrop = false; - if (IamInNew) { - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), - "update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld", pMsg->lastIndex, - pMsg->lastTerm, pMsg->lastConfigIndex); - syncNodeEventLog(pSyncNode, eventLog); - - syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop); - - } else { - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), - "do not update config by snapshot, not in new, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld", - pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex); - syncNodeEventLog(pSyncNode, eventLog); - } - - // change isStandBy to normal - if (!isDrop) { - char tmpbuf[512]; - char *oldStr = syncCfg2SimpleStr(&oldSyncCfg); - char *newStr = syncCfg2SimpleStr(&newSyncCfg); - snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, index:%ld, %s --> %s", - oldSyncCfg.replicaNum, newSyncCfg.replicaNum, pMsg->lastConfigIndex, oldStr, newStr); - taosMemoryFree(oldStr); - taosMemoryFree(newStr); - - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(pSyncNode, tmpbuf); - } else { - syncNodeBecomeFollower(pSyncNode, tmpbuf); - } - } + + // do config change + syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex); } SSnapshot snapshot; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index cbc1298113ca630101205ea8da8eeadebb54c744..1d1ff7ae53682b3b47badc7e04084de89463b2f3 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -261,23 +261,29 @@ bool syncUtilIsData(tmsg_t msgType) { #endif bool syncUtilUserPreCommit(tmsg_t msgType) { - if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) { + if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH && + msgType != TDMT_SYNC_LEADER_TRANSFER) { return true; } + return false; } bool syncUtilUserCommit(tmsg_t msgType) { - if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) { + if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH && + msgType != TDMT_SYNC_LEADER_TRANSFER) { return true; } + return false; } bool syncUtilUserRollback(tmsg_t msgType) { - if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) { + if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH && + msgType != TDMT_SYNC_LEADER_TRANSFER) { return true; } + return false; } diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 725343e3735401a0d87e1124d1e2313f6159c993..2057aa23a4d07e492a70164017b5111b434218ff 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -49,6 +49,7 @@ add_executable(syncRaftLogTest "") add_executable(syncRaftLogTest2 "") add_executable(syncRaftLogTest3 "") add_executable(syncLeaderTransferTest "") +add_executable(syncReconfigFinishTest "") target_sources(syncTest @@ -255,6 +256,10 @@ target_sources(syncLeaderTransferTest PRIVATE "syncLeaderTransferTest.cpp" ) +target_sources(syncReconfigFinishTest + PRIVATE + "syncReconfigFinishTest.cpp" +) target_include_directories(syncTest @@ -512,6 +517,11 @@ target_include_directories(syncLeaderTransferTest "${TD_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncReconfigFinishTest + PUBLIC + "${TD_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -718,6 +728,10 @@ target_link_libraries(syncLeaderTransferTest sync gtest_main ) +target_link_libraries(syncReconfigFinishTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 6fd694427308b09e3ef6dac2dade8ea9c990e41e..b4f173ff021e0d1dc7704bbef4c77e25fd24294c 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -147,8 +147,8 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) { - sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu", - cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); + sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%ld, code:%d, currentTerm:%lu, term:%lu", cbMeta.flag, + cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); } SSyncFSM* createFsm() { diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index a185870a3b17b06aa8cf161f8a4ac137ba106051..c96e337378ca20323d37882e3bc73093e1d1cdb0 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -78,8 +78,8 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) { - sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu", - cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); + sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%ld, code:%d, currentTerm:%lu, term:%lu", cbMeta.flag, + cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); } SSyncFSM* createFsm() { diff --git a/source/libs/sync/test/syncRaftCfgTest.cpp b/source/libs/sync/test/syncRaftCfgTest.cpp index 8171d266d4eb26cd6f7b72a8d617f20be7846048..0f111ef22c2575a38e07658eef60fe3b0656d06d 100644 --- a/source/libs/sync/test/syncRaftCfgTest.cpp +++ b/source/libs/sync/test/syncRaftCfgTest.cpp @@ -137,6 +137,6 @@ int main() { test3(); test4(); test5(); - + return 0; } diff --git a/source/libs/sync/test/syncReconfigFinishTest.cpp b/source/libs/sync/test/syncReconfigFinishTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..22e22bb5621d5942f78d49d320483667a13bbec1 --- /dev/null +++ b/source/libs/sync/test/syncReconfigFinishTest.cpp @@ -0,0 +1,135 @@ +#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SSyncCfg* createSyncOldCfg() { + SSyncCfg* pCfg = (SSyncCfg*)taosMemoryMalloc(sizeof(SSyncCfg)); + memset(pCfg, 0, sizeof(SSyncCfg)); + + pCfg->replicaNum = 3; + pCfg->myIndex = 1; + for (int i = 0; i < pCfg->replicaNum; ++i) { + ((pCfg->nodeInfo)[i]).nodePort = i * 100; + snprintf(((pCfg->nodeInfo)[i]).nodeFqdn, sizeof(((pCfg->nodeInfo)[i]).nodeFqdn), "100.200.300.%d", i); + } + + return pCfg; +} + +SSyncCfg* createSyncNewCfg() { + SSyncCfg* pCfg = (SSyncCfg*)taosMemoryMalloc(sizeof(SSyncCfg)); + memset(pCfg, 0, sizeof(SSyncCfg)); + + pCfg->replicaNum = 3; + pCfg->myIndex = 1; + for (int i = 0; i < pCfg->replicaNum; ++i) { + ((pCfg->nodeInfo)[i]).nodePort = i * 100; + snprintf(((pCfg->nodeInfo)[i]).nodeFqdn, sizeof(((pCfg->nodeInfo)[i]).nodeFqdn), "500.600.700.%d", i); + } + + return pCfg; +} + +SyncReconfigFinish *createMsg() { + SyncReconfigFinish *pMsg = syncReconfigFinishBuild(1234); + + SSyncCfg* pOld = createSyncOldCfg(); + SSyncCfg* pNew = createSyncNewCfg(); + pMsg->oldCfg = *pOld; + pMsg->newCfg = *pNew; + + pMsg->newCfgIndex = 11; + pMsg->newCfgTerm = 22; + pMsg->newCfgSeqNum = 33; + + taosMemoryFree(pOld); + taosMemoryFree(pNew); + + return pMsg; +} + + +void test1() { + SyncReconfigFinish *pMsg = createMsg(); + syncReconfigFinishLog2((char *)"test1:", pMsg); + syncReconfigFinishDestroy(pMsg); +} + + +void test2() { + SyncReconfigFinish *pMsg = createMsg(); + uint32_t len = pMsg->bytes; + char * serialized = (char *)taosMemoryMalloc(len); + syncReconfigFinishSerialize(pMsg, serialized, len); + SyncReconfigFinish *pMsg2 = syncReconfigFinishBuild(1000); + syncReconfigFinishDeserialize(serialized, len, pMsg2); + syncReconfigFinishLog2((char *)"test2: syncReconfigFinishSerialize -> syncReconfigFinishDeserialize ", pMsg2); + + taosMemoryFree(serialized); + syncReconfigFinishDestroy(pMsg); + syncReconfigFinishDestroy(pMsg2); +} + +void test3() { + SyncReconfigFinish *pMsg = createMsg(); + uint32_t len; + char * serialized = syncReconfigFinishSerialize2(pMsg, &len); + SyncReconfigFinish *pMsg2 = syncReconfigFinishDeserialize2(serialized, len); + syncReconfigFinishLog2((char *)"test3: SyncReconfigFinishSerialize2 -> syncReconfigFinishDeserialize2 ", pMsg2); + + taosMemoryFree(serialized); + syncReconfigFinishDestroy(pMsg); + syncReconfigFinishDestroy(pMsg2); +} + +void test4() { + SyncReconfigFinish *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncReconfigFinish2RpcMsg(pMsg, &rpcMsg); + SyncReconfigFinish *pMsg2 = (SyncReconfigFinish *)taosMemoryMalloc(rpcMsg.contLen); + syncReconfigFinishFromRpcMsg(&rpcMsg, pMsg2); + syncReconfigFinishLog2((char *)"test4: syncReconfigFinish2RpcMsg -> syncReconfigFinishFromRpcMsg ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncReconfigFinishDestroy(pMsg); + syncReconfigFinishDestroy(pMsg2); +} + +void test5() { + SyncReconfigFinish *pMsg = createMsg(); + SRpcMsg rpcMsg; + syncReconfigFinish2RpcMsg(pMsg, &rpcMsg); + SyncReconfigFinish *pMsg2 = syncReconfigFinishFromRpcMsg2(&rpcMsg); + syncReconfigFinishLog2((char *)"test5: syncReconfigFinish2RpcMsg -> syncReconfigFinishFromRpcMsg2 ", pMsg2); + + rpcFreeCont(rpcMsg.pCont); + syncReconfigFinishDestroy(pMsg); + syncReconfigFinishDestroy(pMsg2); +} + +int main() { + gRaftDetailLog = true; + tsAsyncLog = 0; + sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; + logTest(); + + test1(); + test2(); + test3(); + test4(); + test5(); + + return 0; +} diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 74dfaa192a4efbdb8cbd4c8f9a592944f6a093f5..91a16cc0337bad490c5f19c59a4ca6f48acd3eea 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -148,8 +148,8 @@ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFini void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) { char* s = syncCfg2Str(&(cbMeta.newCfg)); - sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu, newCfg:%s", - cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term, s); + sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%ld, code:%d, currentTerm:%lu, term:%lu, newCfg:%s", + cbMeta.flag, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term, s); taosMemoryFree(s); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 7f949e5b2743d987421edb4ff09f3393ac662f96..6afb240ab91b52c603cd3009c57d8b3c708f7ad0 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -423,6 +423,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_IS_LEADER, "Sync is leader") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_ONE_REPLICA, "Sync one replica") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_IN_NEW_CONFIG, "Sync not in new config") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEW_CONFIG_ERROR, "Sync new config error") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RECONFIG_NOT_READY, "Sync not ready for reconfig") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready for propose") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") // wal