未验证 提交 55a79f25 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #14031 from taosdata/feature/3.0_mhli

refactor(sync): propose fail when changing
...@@ -236,6 +236,7 @@ enum { ...@@ -236,6 +236,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_SYNC_COMMON_RESPONSE, "sync-common-response", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_COMMON_RESPONSE, "sync-common-response", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_APPLY_MSG, "sync-apply-msg", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_APPLY_MSG, "sync-apply-msg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_CONFIG_CHANGE, "sync-config-change", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_CONFIG_CHANGE, "sync-config-change", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_CONFIG_CHANGE_FINISH, "sync-config-change-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_SEND, "sync-snapshot-send", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_SEND, "sync-snapshot-send", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_RSP, "sync-snapshot-rsp", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_SNAPSHOT_RSP, "sync-snapshot-rsp", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL)
......
...@@ -24,6 +24,8 @@ extern "C" { ...@@ -24,6 +24,8 @@ extern "C" {
#include "tdef.h" #include "tdef.h"
#include "tmsgcb.h" #include "tmsgcb.h"
extern bool gRaftDetailLog;
#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1 #define SYNC_INDEX_INVALID -1
...@@ -61,14 +63,14 @@ typedef struct SSyncCfg { ...@@ -61,14 +63,14 @@ typedef struct SSyncCfg {
} SSyncCfg; } SSyncCfg;
typedef struct SFsmCbMeta { typedef struct SFsmCbMeta {
int32_t code;
SyncIndex index; SyncIndex index;
SyncTerm term;
uint64_t seqNum;
SyncIndex lastConfigIndex; SyncIndex lastConfigIndex;
bool isWeak;
int32_t code;
ESyncState state; ESyncState state;
uint64_t seqNum;
SyncTerm term;
SyncTerm currentTerm; SyncTerm currentTerm;
bool isWeak;
uint64_t flag; uint64_t flag;
} SFsmCbMeta; } SFsmCbMeta;
...@@ -76,13 +78,20 @@ typedef struct SReConfigCbMeta { ...@@ -76,13 +78,20 @@ typedef struct SReConfigCbMeta {
int32_t code; int32_t code;
SyncIndex index; SyncIndex index;
SyncTerm term; SyncTerm term;
uint64_t seqNum;
SyncIndex lastConfigIndex; SyncIndex lastConfigIndex;
ESyncState state;
SyncTerm currentTerm; SyncTerm currentTerm;
bool isWeak;
uint64_t flag;
// config info
SSyncCfg oldCfg; SSyncCfg oldCfg;
SSyncCfg newCfg; SSyncCfg newCfg;
bool isDrop; SyncIndex newCfgIndex;
uint64_t flag; SyncTerm newCfgTerm;
uint64_t seqNum; uint64_t newCfgSeqNum;
} SReConfigCbMeta; } SReConfigCbMeta;
typedef struct SSnapshot { typedef struct SSnapshot {
...@@ -107,8 +116,7 @@ typedef struct SSyncFSM { ...@@ -107,8 +116,7 @@ typedef struct SSyncFSM {
void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta); void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta);
void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void *pReaderParam, void** ppReader);
int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader); int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader);
...@@ -189,14 +197,13 @@ ESyncState syncGetMyRole(int64_t rid); ...@@ -189,14 +197,13 @@ ESyncState syncGetMyRole(int64_t rid);
bool syncIsReady(int64_t rid); bool syncIsReady(int64_t rid);
const char* syncGetMyRoleStr(int64_t rid); const char* syncGetMyRoleStr(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid);
SyncGroupId syncGetVgId(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
int32_t syncGetVgId(int64_t rid);
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
bool syncEnvIsStart(); bool syncEnvIsStart();
const char* syncStr(ESyncState state); const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid); bool syncIsRestoreFinish(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);
// build SRpcMsg, need to call syncPropose with SRpcMsg // build SRpcMsg, need to call syncPropose with SRpcMsg
......
...@@ -489,6 +489,40 @@ void syncLeaderTransferPrint2(char* s, const SyncLeaderTransfer* pMsg); ...@@ -489,6 +489,40 @@ void syncLeaderTransferPrint2(char* s, const SyncLeaderTransfer* pMsg);
void syncLeaderTransferLog(const SyncLeaderTransfer* pMsg); void syncLeaderTransferLog(const SyncLeaderTransfer* pMsg);
void syncLeaderTransferLog2(char* s, const SyncLeaderTransfer* pMsg); void syncLeaderTransferLog2(char* s, const SyncLeaderTransfer* pMsg);
// ---------------------------------------------
typedef struct SyncReconfigFinish {
uint32_t bytes;
int32_t vgId;
uint32_t msgType;
SSyncCfg oldCfg;
SSyncCfg newCfg;
SyncIndex newCfgIndex;
SyncTerm newCfgTerm;
uint64_t newCfgSeqNum;
} SyncReconfigFinish;
SyncReconfigFinish* syncReconfigFinishBuild(int32_t vgId);
void syncReconfigFinishDestroy(SyncReconfigFinish* pMsg);
void syncReconfigFinishSerialize(const SyncReconfigFinish* pMsg, char* buf, uint32_t bufLen);
void syncReconfigFinishDeserialize(const char* buf, uint32_t len, SyncReconfigFinish* pMsg);
char* syncReconfigFinishSerialize2(const SyncReconfigFinish* pMsg, uint32_t* len);
SyncReconfigFinish* syncReconfigFinishDeserialize2(const char* buf, uint32_t len);
void syncReconfigFinish2RpcMsg(const SyncReconfigFinish* pMsg, SRpcMsg* pRpcMsg);
void syncReconfigFinishFromRpcMsg(const SRpcMsg* pRpcMsg, SyncReconfigFinish* pMsg);
SyncReconfigFinish* syncReconfigFinishFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncReconfigFinish2Json(const SyncReconfigFinish* pMsg);
char* syncReconfigFinish2Str(const SyncReconfigFinish* pMsg);
// for debug ----------------------
void syncReconfigFinishPrint(const SyncReconfigFinish* pMsg);
void syncReconfigFinishPrint2(char* s, const SyncReconfigFinish* pMsg);
void syncReconfigFinishLog(const SyncReconfigFinish* pMsg);
void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg);
// on message ---------------------- // on message ----------------------
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
......
...@@ -416,6 +416,9 @@ int32_t* taosGetErrno(); ...@@ -416,6 +416,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x090C) #define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x090C)
#define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x090D) #define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x090D)
#define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x090E) #define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x090E)
#define TSDB_CODE_SYN_NEW_CONFIG_ERROR TAOS_DEF_ERROR_CODE(0, 0x090F)
#define TSDB_CODE_SYN_RECONFIG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0910)
#define TSDB_CODE_SYN_PROPOSE_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0911)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq // tq
......
...@@ -58,21 +58,21 @@ static void *mndBuildTimerMsg(int32_t *pContLen) { ...@@ -58,21 +58,21 @@ static void *mndBuildTimerMsg(int32_t *pContLen) {
static void mndPullupTrans(SMnode *pMnode) { static void mndPullupTrans(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void * pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
} }
static void mndCalMqRebalance(SMnode *pMnode) { static void mndCalMqRebalance(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void * pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
} }
static void mndPullupTelem(SMnode *pMnode) { static void mndPullupTelem(SMnode *pMnode) {
int32_t contLen = 0; int32_t contLen = 0;
void * pReq = mndBuildTimerMsg(&contLen); void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
} }
...@@ -97,13 +97,13 @@ static void mndPushTtlTime(SMnode *pMnode) { ...@@ -97,13 +97,13 @@ static void mndPushTtlTime(SMnode *pMnode) {
pHead->vgId = htonl(pVgroup->vgId); pHead->vgId = htonl(pVgroup->vgId);
int32_t t = taosGetTimestampSec(); int32_t t = taosGetTimestampSec();
*(int32_t*)(POINTER_SHIFT(pHead, sizeof(SMsgHead))) = htonl(t); *(int32_t *)(POINTER_SHIFT(pHead, sizeof(SMsgHead))) = htonl(t);
SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen}; SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen};
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup); SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t code = tmsgSendReq(&epSet, &rpcMsg); int32_t code = tmsgSendReq(&epSet, &rpcMsg);
if(code != 0){ if (code != 0) {
mError("ttl time seed err. code:%d", code); mError("ttl time seed err. code:%d", code);
} }
mError("ttl time seed succ. time:%d", t); mError("ttl time seed succ. time:%d", t);
...@@ -416,7 +416,7 @@ void mndStop(SMnode *pMnode) { ...@@ -416,7 +416,7 @@ void mndStop(SMnode *pMnode) {
} }
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SMnode * pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
int32_t code = 0; int32_t code = 0;
...@@ -433,15 +433,19 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { ...@@ -433,15 +433,19 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
return -1; return -1;
} }
char logBuf[512] = {0}; do {
char *syncNodeStr = sync2SimpleStr(pMgmt->sync); char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
snprintf(logBuf, sizeof(logBuf), "==mndProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
static int64_t mndTick = 0; static int64_t mndTick = 0;
if (++mndTick % 10 == 1) { if (++mndTick % 10 == 1) {
mTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr); mTrace("vgId:%d, sync heartbeat msg:%s, %s", syncGetVgId(pMgmt->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
} }
if (gRaftDetailLog) {
char logBuf[512] = {0};
snprintf(logBuf, sizeof(logBuf), "==mndProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
syncRpcMsgLog2(logBuf, pMsg); syncRpcMsgLog2(logBuf, pMsg);
}
taosMemoryFree(syncNodeStr); taosMemoryFree(syncNodeStr);
} while (0);
// ToDo: ugly! use function pointer // ToDo: ugly! use function pointer
if (syncNodeSnapshotEnable(pSyncNode)) { if (syncNodeSnapshotEnable(pSyncNode)) {
...@@ -580,7 +584,7 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) { ...@@ -580,7 +584,7 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
} }
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
SMnode * pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)]; MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
if (fp == NULL) { if (fp == NULL) {
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
...@@ -633,7 +637,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr ...@@ -633,7 +637,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
SMonGrantInfo *pGrantInfo) { SMonGrantInfo *pGrantInfo) {
if (mndAcquireRpcRef(pMnode) != 0) return -1; if (mndAcquireRpcRef(pMnode) != 0) return -1;
SSdb * pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int64_t ms = taosGetTimestampMs(); int64_t ms = taosGetTimestampMs();
pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc)); pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
...@@ -709,7 +713,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr ...@@ -709,7 +713,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries; pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
tstrncpy(desc.status, "unsynced", sizeof(desc.status)); tstrncpy(desc.status, "unsynced", sizeof(desc.status));
for (int32_t i = 0; i < pVgroup->replica; ++i) { for (int32_t i = 0; i < pVgroup->replica; ++i) {
SVnodeGid * pVgid = &pVgroup->vnodeGid[i]; SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
SMonVnodeDesc *pVnDesc = &desc.vnodes[i]; SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
pVnDesc->dnode_id = pVgid->dnodeId; pVnDesc->dnode_id = pVgid->dnodeId;
tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role)); tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));
......
...@@ -896,7 +896,7 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) ...@@ -896,7 +896,7 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
pAction->rawWritten = 0; pAction->rawWritten = 0;
pAction->msgSent = 0; pAction->msgSent = 0;
pAction->msgReceived = 0; pAction->msgReceived = 0;
if (pAction->errCode == TSDB_CODE_RPC_REDIRECT || pAction->errCode == TSDB_CODE_SYN_NOT_IN_NEW_CONFIG || if (pAction->errCode == TSDB_CODE_RPC_REDIRECT || pAction->errCode == TSDB_CODE_SYN_NEW_CONFIG_ERROR ||
pAction->errCode == TSDB_CODE_SYN_INTERNAL_ERROR || pAction->errCode == TSDB_CODE_SYN_NOT_LEADER) { pAction->errCode == TSDB_CODE_SYN_INTERNAL_ERROR || pAction->errCode == TSDB_CODE_SYN_NOT_LEADER) {
pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps; pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps;
mDebug("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage), mDebug("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage),
......
...@@ -119,7 +119,7 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -119,7 +119,7 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
} }
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnode * pVnode = pInfo->ahandle; SVnode *pVnode = pInfo->ahandle;
int32_t vgId = pVnode->config.vgId; int32_t vgId = pVnode->config.vgId;
int32_t code = 0; int32_t code = 0;
SRpcMsg *pMsg = NULL; SRpcMsg *pMsg = NULL;
...@@ -174,7 +174,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -174,7 +174,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
} }
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnode * pVnode = pInfo->ahandle; SVnode *pVnode = pInfo->ahandle;
int32_t vgId = pVnode->config.vgId; int32_t vgId = pVnode->config.vgId;
int32_t code = 0; int32_t code = 0;
SRpcMsg *pMsg = NULL; SRpcMsg *pMsg = NULL;
...@@ -211,21 +211,23 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -211,21 +211,23 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
ESyncState state = syncGetMyRole(pVnode->sync);
SyncTerm currentTerm = syncGetMyTerm(pVnode->sync);
SMsgHead *pHead = pMsg->pCont; SMsgHead *pHead = pMsg->pCont;
STraceId *trace = &pMsg->info.traceId;
char logBuf[512] = {0}; do {
char *syncNodeStr = sync2SimpleStr(pVnode->sync); char *syncNodeStr = sync2SimpleStr(pVnode->sync);
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
static int64_t vndTick = 0; static int64_t vndTick = 0;
STraceId * trace = &pMsg->info.traceId;
if (++vndTick % 10 == 1) { if (++vndTick % 10 == 1) {
vGTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr); vGTrace("vgId:%d, sync heartbeat msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
} }
if (gRaftDetailLog) {
char logBuf[512] = {0};
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType,
syncNodeStr);
syncRpcMsgLog2(logBuf, pMsg); syncRpcMsgLog2(logBuf, pMsg);
}
taosMemoryFree(syncNodeStr); taosMemoryFree(syncNodeStr);
} while (0);
SRpcMsg *pRpcMsg = pMsg; SRpcMsg *pRpcMsg = pMsg;
...@@ -348,7 +350,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon ...@@ -348,7 +350,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
} }
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SVnode * pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
SSnapshot snapshot = {0}; SSnapshot snapshot = {0};
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
char logBuf[256] = {0}; char logBuf[256] = {0};
......
...@@ -159,7 +159,8 @@ typedef struct SSyncNode { ...@@ -159,7 +159,8 @@ typedef struct SSyncNode {
SSyncSnapshotSender* senders[TSDB_MAX_REPLICA]; SSyncSnapshotSender* senders[TSDB_MAX_REPLICA];
SSyncSnapshotReceiver* pNewNodeReceiver; SSyncSnapshotReceiver* pNewNodeReceiver;
// SSnapshotMeta sMeta; // is config changing
bool changing;
} SSyncNode; } SSyncNode;
...@@ -198,7 +199,7 @@ char* syncNode2Str(const SSyncNode* pSyncNode); ...@@ -198,7 +199,7 @@ char* syncNode2Str(const SSyncNode* pSyncNode);
void syncNodeEventLog(const SSyncNode* pSyncNode, char* str); void syncNodeEventLog(const SSyncNode* pSyncNode, char* str);
char* syncNode2SimpleStr(const SSyncNode* pSyncNode); char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config); bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex, bool* isDrop); void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
SSyncNode* syncNodeAcquire(int64_t rid); SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode); void syncNodeRelease(SSyncNode* pNode);
...@@ -244,6 +245,9 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, str ...@@ -244,6 +245,9 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, str
void syncStartNormal(int64_t rid); void syncStartNormal(int64_t rid);
void syncStartStandBy(int64_t rid); void syncStartStandBy(int64_t rid);
bool syncNodeCanChange(SSyncNode* pSyncNode);
bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg);
// for debug -------------- // for debug --------------
void syncNodePrint(SSyncNode* pObj); void syncNodePrint(SSyncNode* pObj);
void syncNodePrint2(char* s, SSyncNode* pObj); void syncNodePrint2(char* s, SSyncNode* pObj);
......
...@@ -49,14 +49,14 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg); ...@@ -49,14 +49,14 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg);
int32_t raftCfgPersist(SRaftCfg *pRaftCfg); int32_t raftCfgPersist(SRaftCfg *pRaftCfg);
int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex); int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex);
cJSON *syncCfg2Json(SSyncCfg *pSyncCfg); cJSON * syncCfg2Json(SSyncCfg *pSyncCfg);
char *syncCfg2Str(SSyncCfg *pSyncCfg); char * syncCfg2Str(SSyncCfg *pSyncCfg);
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg); char * syncCfg2SimpleStr(SSyncCfg *pSyncCfg);
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg); int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg);
int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg); int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg);
cJSON *raftCfg2Json(SRaftCfg *pRaftCfg); cJSON * raftCfg2Json(SRaftCfg *pRaftCfg);
char *raftCfg2Str(SRaftCfg *pRaftCfg); char * raftCfg2Str(SRaftCfg *pRaftCfg);
int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg); int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg);
int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg); int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg);
......
...@@ -70,6 +70,7 @@ typedef struct SSyncSnapshotReceiver { ...@@ -70,6 +70,7 @@ typedef struct SSyncSnapshotReceiver {
void *pWriter; void *pWriter;
SyncTerm term; SyncTerm term;
SyncTerm privateTerm; SyncTerm privateTerm;
SSnapshot snapshot;
SSyncNode *pSyncNode; SSyncNode *pSyncNode;
SRaftId fromId; SRaftId fromId;
...@@ -78,7 +79,7 @@ typedef struct SSyncSnapshotReceiver { ...@@ -78,7 +79,7 @@ typedef struct SSyncSnapshotReceiver {
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId); SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId);
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId); void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
......
...@@ -178,6 +178,23 @@ int32_t syncSetStandby(int64_t rid) { ...@@ -178,6 +178,23 @@ int32_t syncSetStandby(int64_t rid) {
return 0; return 0;
} }
bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pNewCfg) {
bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg);
if (!IamInNew) {
return false;
}
if (pNewCfg->replicaNum > pSyncNode->replicaNum + 1) {
return false;
}
if (pNewCfg->replicaNum < pSyncNode->replicaNum - 1) {
return false;
}
return true;
}
int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) { int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
...@@ -185,13 +202,12 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg ...@@ -185,13 +202,12 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
return -1; return -1;
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
int32_t ret = 0; int32_t ret = 0;
bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg);
if (!IamInNew) { if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_NOT_IN_NEW_CONFIG; terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
sError("syncNodeCheckNewConfig error");
return -1; return -1;
} }
...@@ -215,12 +231,10 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) { ...@@ -215,12 +231,10 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
} }
ASSERT(rid == pSyncNode->rid); ASSERT(rid == pSyncNode->rid);
bool IamInNew = syncNodeInConfig(pSyncNode, pNewCfg); if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) {
if (!IamInNew) {
sError("sync reconfig error, not in new config");
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
terrno = TSDB_CODE_SYN_NOT_IN_NEW_CONFIG; terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR;
sError("syncNodeCheckNewConfig error");
return -1; return -1;
} }
...@@ -425,28 +439,28 @@ const char* syncGetMyRoleStr(int64_t rid) { ...@@ -425,28 +439,28 @@ const char* syncGetMyRoleStr(int64_t rid) {
return s; return s;
} }
int32_t syncGetVgId(int64_t rid) { SyncTerm syncGetMyTerm(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR; return TAOS_SYNC_STATE_ERROR;
} }
assert(rid == pSyncNode->rid); assert(rid == pSyncNode->rid);
int32_t vgId = pSyncNode->vgId; SyncTerm term = pSyncNode->pRaftStore->currentTerm;
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return vgId; return term;
} }
SyncTerm syncGetMyTerm(int64_t rid) { SyncGroupId syncGetVgId(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR; return TAOS_SYNC_STATE_ERROR;
} }
assert(rid == pSyncNode->rid); assert(rid == pSyncNode->rid);
SyncTerm term = pSyncNode->pRaftStore->currentTerm; SyncGroupId vgId = pSyncNode->vgId;
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return term; return vgId;
} }
void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
...@@ -589,6 +603,26 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) ...@@ -589,6 +603,26 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak)
syncNodeEventLog(pSyncNode, eventLog); syncNodeEventLog(pSyncNode, eventLog);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
if (pSyncNode->changing && pMsg->msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH) {
ret = -1;
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
sError("sync propose not ready, type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType);
goto _END;
}
// config change
if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
if (!syncNodeCanChange(pSyncNode)) {
ret = -1;
terrno = TSDB_CODE_SYN_RECONFIG_NOT_READY;
sError("sync reconfig not ready, type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType);
goto _END;
}
ASSERT(!pSyncNode->changing);
pSyncNode->changing = true;
}
SRespStub stub; SRespStub stub;
stub.createTime = taosGetTimestampMs(); stub.createTime = taosGetTimestampMs();
stub.rpcMsg = *pMsg; stub.rpcMsg = *pMsg;
...@@ -606,12 +640,16 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) ...@@ -606,12 +640,16 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak)
sError("syncPropose pSyncNode->FpEqMsg is NULL"); sError("syncPropose pSyncNode->FpEqMsg is NULL");
} }
syncClientRequestDestroy(pSyncMsg); syncClientRequestDestroy(pSyncMsg);
goto _END;
} else { } else {
ret = -1; ret = -1;
terrno = TSDB_CODE_SYN_NOT_LEADER; terrno = TSDB_CODE_SYN_NOT_LEADER;
sError("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state)); sError("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state));
goto _END;
} }
_END:
return ret; return ret;
} }
...@@ -825,6 +863,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -825,6 +863,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// snapshot receivers // snapshot receivers
pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID); pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID);
// is config changing
pSyncNode->changing = false;
// start in syncNodeStart // start in syncNodeStart
// start raft // start raft
// syncNodeBecomeFollower(pSyncNode); // syncNodeBecomeFollower(pSyncNode);
...@@ -1253,20 +1294,32 @@ char* syncNode2Str(const SSyncNode* pSyncNode) { ...@@ -1253,20 +1294,32 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
int32_t userStrLen = strlen(str); int32_t userStrLen = strlen(str);
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
}
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
if (userStrLen < 256) { if (userStrLen < 256) {
char logBuf[128 + 256]; char logBuf[128 + 256];
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"vgId:%d %s term:%lu commit:%ld standby:%d replica-num:%d lconfig:%ld sync event %s", pSyncNode->vgId, "vgId:%d, sync %s %s, term:%lu, commit:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, replica-num:%d, "
syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, "lconfig:%ld, changing:%d",
pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, str); pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy,
pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing);
sDebug("%s", logBuf); sDebug("%s", logBuf);
} else { } else {
int len = 128 + userStrLen; int len = 128 + userStrLen;
char* s = (char*)taosMemoryMalloc(len); char* s = (char*)taosMemoryMalloc(len);
snprintf(s, len, "vgId:%d %s term:%lu commit:%ld standby:%d replica-num:%d lconfig:%ld sync event %s", snprintf(s, len,
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, "vgId:%d, sync %s %s, term:%lu, commit:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, replica-num:%d, "
pSyncNode->commitIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, "lconfig:%ld, changing:%d",
pSyncNode->pRaftCfg->lastConfigIndex, str); pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy,
pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing);
sDebug("%s", s); sDebug("%s", s);
taosMemoryFree(s); taosMemoryFree(s);
} }
...@@ -1313,11 +1366,41 @@ bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) { ...@@ -1313,11 +1366,41 @@ bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config) {
return b1; return b1;
} }
void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex, bool* isDrop) { void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex lastConfigChangeIndex) {
SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg; SSyncCfg oldConfig = pSyncNode->pRaftCfg->cfg;
pSyncNode->pRaftCfg->cfg = *pNewConfig; pSyncNode->pRaftCfg->cfg = *pNewConfig;
pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex; pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;
bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
bool isDrop = false;
bool isAdd = false;
if (IamInOld && !IamInNew) {
isDrop = true;
} else {
isDrop = false;
}
if (!IamInOld && IamInNew) {
isAdd = true;
} else {
isAdd = false;
}
if (IamInNew) {
pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal
}
if (isDrop) {
pSyncNode->pRaftCfg->isStandBy = 1; // set standby
}
// persist last config index
raftCfgAddConfigIndex(pSyncNode->pRaftCfg, lastConfigChangeIndex);
if (IamInNew) {
//-----------------------------------------
int32_t ret = 0; int32_t ret = 0;
// save snapshot senders // save snapshot senders
...@@ -1397,8 +1480,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex ...@@ -1397,8 +1480,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
do { do {
char eventLog[256]; char eventLog[256];
snprintf(eventLog, sizeof(eventLog), "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", snprintf(eventLog, sizeof(eventLog),
oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset); "snapshot sender udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", oldreplicaIndex, i, host,
port, (pSyncNode->senders)[i], reset);
syncNodeEventLog(pSyncNode, eventLog); syncNodeEventLog(pSyncNode, eventLog);
} while (0); } while (0);
} }
...@@ -1431,28 +1515,39 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex ...@@ -1431,28 +1515,39 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
} }
} }
bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig); // persist cfg
bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig); raftCfgPersist(pSyncNode->pRaftCfg);
*isDrop = true;
if (IamInOld && !IamInNew) {
*isDrop = true;
} else {
*isDrop = false;
}
// may be add me to a new raft group char tmpbuf[512];
if (IamInOld && IamInNew && oldConfig.replicaNum == 1) { char* oldStr = syncCfg2SimpleStr(&oldConfig);
} char* newStr = syncCfg2SimpleStr(pNewConfig);
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%ld, %s --> %s", oldConfig.replicaNum,
pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
if (IamInNew) { // change isStandBy to normal (election timeout)
pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(pSyncNode, tmpbuf);
} else {
syncNodeBecomeFollower(pSyncNode, tmpbuf);
} }
} else {
// persist cfg
raftCfgPersist(pSyncNode->pRaftCfg); raftCfgPersist(pSyncNode->pRaftCfg);
if (gRaftDetailLog) { char tmpbuf[512];
syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode); char* oldStr = syncCfg2SimpleStr(&oldConfig);
char* newStr = syncCfg2SimpleStr(pNewConfig);
snprintf(tmpbuf, sizeof(tmpbuf), "do not config change from %d to %d, index:%ld, %s --> %s", oldConfig.replicaNum,
pNewConfig->replicaNum, lastConfigChangeIndex, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
syncNodeEventLog(pSyncNode, tmpbuf);
} }
_END:
return;
} }
SSyncNode* syncNodeAcquire(int64_t rid) { SSyncNode* syncNodeAcquire(int64_t rid) {
...@@ -2170,90 +2265,86 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) { ...@@ -2170,90 +2265,86 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) {
return -1; return -1;
} }
static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { static int32_t syncNodeConfigChangeFinish(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg; SyncReconfigFinish* pFinish = syncReconfigFinishFromRpcMsg2(pRpcMsg);
ASSERT(pFinish);
SSyncCfg newSyncCfg;
int32_t ret = syncCfgFromStr(pRpcMsg->pCont, &newSyncCfg);
ASSERT(ret == 0);
// update new config myIndex if (ths->pFsm->FpReConfigCb != NULL) {
syncNodeUpdateNewConfigIndex(ths, &newSyncCfg); SReConfigCbMeta cbMeta = {0};
cbMeta.code = 0;
cbMeta.index = pEntry->index;
cbMeta.term = pEntry->term;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index);
cbMeta.state = ths->state;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.isWeak = pEntry->isWeak;
cbMeta.flag = 0;
bool IamInNew = syncNodeInConfig(ths, &newSyncCfg); cbMeta.oldCfg = pFinish->oldCfg;
cbMeta.newCfg = pFinish->newCfg;
cbMeta.newCfgIndex = pFinish->newCfgIndex;
cbMeta.newCfgTerm = pFinish->newCfgTerm;
cbMeta.newCfgSeqNum = pFinish->newCfgSeqNum;
/* ths->pFsm->FpReConfigCb(ths->pFsm, pRpcMsg, cbMeta);
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
newSyncCfg.myIndex = i;
IamInNew = true;
break;
}
} }
*/
bool isDrop;
if (IamInNew) { // update changing
syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop); ths->changing = false;
// change isStandBy to normal
if (!isDrop) {
char tmpbuf[512]; char tmpbuf[512];
char* oldStr = syncCfg2SimpleStr(&oldSyncCfg); char* oldStr = syncCfg2SimpleStr(&(pFinish->oldCfg));
char* newStr = syncCfg2SimpleStr(&newSyncCfg); char* newStr = syncCfg2SimpleStr(&(pFinish->newCfg));
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum, snprintf(tmpbuf, sizeof(tmpbuf), "config change finish from %d to %d, index:%ld, %s --> %s",
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr); pFinish->oldCfg.replicaNum, pFinish->newCfg.replicaNum, pFinish->newCfgIndex, oldStr, newStr);
taosMemoryFree(oldStr); taosMemoryFree(oldStr);
taosMemoryFree(newStr); taosMemoryFree(newStr);
syncNodeEventLog(ths, tmpbuf);
if (ths->state == TAOS_SYNC_STATE_LEADER) { syncReconfigFinishDestroy(pFinish);
syncNodeBecomeLeader(ths, tmpbuf);
} else {
syncNodeBecomeFollower(ths, tmpbuf);
}
}
} else {
char tmpbuf[512];
char* oldStr = syncCfg2SimpleStr(&oldSyncCfg);
char* newStr = syncCfg2SimpleStr(&newSyncCfg);
snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, index:%ld, %s --> %s", oldSyncCfg.replicaNum,
newSyncCfg.replicaNum, pEntry->index, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
syncNodeBecomeFollower(ths, tmpbuf); return 0;
} }
if (gRaftDetailLog) { static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry,
char* sOld = syncCfg2Str(&oldSyncCfg); SyncReconfigFinish* pFinish) {
char* sNew = syncCfg2Str(&newSyncCfg); // old config
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d index:%ld IamInNew:%d \n", sOld, sNew, isDrop, pEntry->index, SSyncCfg oldSyncCfg = ths->pRaftCfg->cfg;
IamInNew);
taosMemoryFree(sOld);
taosMemoryFree(sNew);
}
// always call FpReConfigCb // new config
if (ths->pFsm->FpReConfigCb != NULL) { SSyncCfg newSyncCfg;
SReConfigCbMeta cbMeta = {0}; int32_t ret = syncCfgFromStr(pRpcMsg->pCont, &newSyncCfg);
cbMeta.code = 0; ASSERT(ret == 0);
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.index = pEntry->index; // update new config myIndex
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index); syncNodeUpdateNewConfigIndex(ths, &newSyncCfg);
cbMeta.term = pEntry->term;
cbMeta.newCfg = newSyncCfg; // do config change
cbMeta.oldCfg = oldSyncCfg; syncNodeDoConfigChange(ths, &newSyncCfg, pEntry->index);
cbMeta.seqNum = pEntry->seqNum;
cbMeta.flag = 0x11; // set pFinish
cbMeta.isDrop = isDrop; pFinish->oldCfg = oldSyncCfg;
ths->pFsm->FpReConfigCb(ths->pFsm, pRpcMsg, cbMeta); pFinish->newCfg = newSyncCfg;
} pFinish->newCfgIndex = pEntry->index;
pFinish->newCfgTerm = pEntry->term;
pFinish->newCfgSeqNum = pEntry->seqNum;
return 0; return 0;
} }
static int32_t syncNodeProposeConfigChangeFinish(SSyncNode* ths, SyncReconfigFinish* pFinish) {
SRpcMsg rpcMsg;
syncReconfigFinish2RpcMsg(pFinish, &rpcMsg);
int32_t code = syncNodePropose(ths, &rpcMsg, false);
if (code != 0) {
sError("syncNodeProposeConfigChangeFinish error");
ths->changing = false;
}
return 0;
}
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
int32_t code = 0; int32_t code = 0;
ESyncState state = flag; ESyncState state = flag;
...@@ -2292,9 +2383,21 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ...@@ -2292,9 +2383,21 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
// config change // config change
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) { if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
raftCfgAddConfigIndex(ths->pRaftCfg, pEntry->index); SyncReconfigFinish* pFinish = syncReconfigFinishBuild(ths->vgId);
raftCfgPersist(ths->pRaftCfg); ASSERT(pFinish != NULL);
code = syncNodeConfigChange(ths, &rpcMsg, pEntry);
code = syncNodeConfigChange(ths, &rpcMsg, pEntry, pFinish);
ASSERT(code == 0);
if (ths->state == TAOS_SYNC_STATE_LEADER) {
syncNodeProposeConfigChangeFinish(ths, pFinish);
}
syncReconfigFinishDestroy(pFinish);
}
// config change finish
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
code = syncNodeConfigChangeFinish(ths, &rpcMsg, pEntry);
ASSERT(code == 0); ASSERT(code == 0);
} }
...@@ -2345,3 +2448,28 @@ SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) ...@@ -2345,3 +2448,28 @@ SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId)
} }
return pSender; return pSender;
} }
bool syncNodeCanChange(SSyncNode* pSyncNode) {
if (pSyncNode->changing) {
sError("sync cannot change");
return false;
}
if ((pSyncNode->commitIndex >= SYNC_INDEX_BEGIN)) {
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
if (pSyncNode->commitIndex != lastIndex) {
sError("sync cannot change2");
return false;
}
}
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]);
if (pSender->start) {
sError("sync cannot change3");
return false;
}
}
return true;
}
\ No newline at end of file
...@@ -2228,3 +2228,132 @@ void syncLeaderTransferLog2(char* s, const SyncLeaderTransfer* pMsg) { ...@@ -2228,3 +2228,132 @@ void syncLeaderTransferLog2(char* s, const SyncLeaderTransfer* pMsg) {
taosMemoryFree(serialized); taosMemoryFree(serialized);
} }
} }
// ---------------------------------------------
SyncReconfigFinish* syncReconfigFinishBuild(int32_t vgId) {
uint32_t bytes = sizeof(SyncReconfigFinish);
SyncReconfigFinish* pMsg = taosMemoryMalloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->vgId = vgId;
pMsg->msgType = TDMT_SYNC_CONFIG_CHANGE_FINISH;
return pMsg;
}
void syncReconfigFinishDestroy(SyncReconfigFinish* pMsg) {
if (pMsg != NULL) {
taosMemoryFree(pMsg);
}
}
void syncReconfigFinishSerialize(const SyncReconfigFinish* pMsg, char* buf, uint32_t bufLen) {
assert(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
}
void syncReconfigFinishDeserialize(const char* buf, uint32_t len, SyncReconfigFinish* pMsg) {
memcpy(pMsg, buf, len);
assert(len == pMsg->bytes);
}
char* syncReconfigFinishSerialize2(const SyncReconfigFinish* pMsg, uint32_t* len) {
char* buf = taosMemoryMalloc(pMsg->bytes);
assert(buf != NULL);
syncReconfigFinishSerialize(pMsg, buf, pMsg->bytes);
if (len != NULL) {
*len = pMsg->bytes;
}
return buf;
}
SyncReconfigFinish* syncReconfigFinishDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncReconfigFinish* pMsg = taosMemoryMalloc(bytes);
assert(pMsg != NULL);
syncReconfigFinishDeserialize(buf, len, pMsg);
assert(len == pMsg->bytes);
return pMsg;
}
void syncReconfigFinish2RpcMsg(const SyncReconfigFinish* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncReconfigFinishSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
}
void syncReconfigFinishFromRpcMsg(const SRpcMsg* pRpcMsg, SyncReconfigFinish* pMsg) {
syncReconfigFinishDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
}
SyncReconfigFinish* syncReconfigFinishFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncReconfigFinish* pMsg = syncReconfigFinishDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
assert(pMsg != NULL);
return pMsg;
}
cJSON* syncReconfigFinish2Json(const SyncReconfigFinish* pMsg) {
char u64buf[128];
cJSON* pRoot = cJSON_CreateObject();
if (pMsg != NULL) {
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON* pOldCfg = syncCfg2Json((SSyncCfg*)(&(pMsg->oldCfg)));
cJSON* pNewCfg = syncCfg2Json((SSyncCfg*)(&(pMsg->newCfg)));
cJSON_AddItemToObject(pRoot, "oldCfg", pOldCfg);
cJSON_AddItemToObject(pRoot, "newCfg", pNewCfg);
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->newCfgIndex);
cJSON_AddStringToObject(pRoot, "newCfgIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->newCfgTerm);
cJSON_AddStringToObject(pRoot, "newCfgTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->newCfgSeqNum);
cJSON_AddStringToObject(pRoot, "newCfgSeqNum", u64buf);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncReconfigFinish", pRoot);
return pJson;
}
char* syncReconfigFinish2Str(const SyncReconfigFinish* pMsg) {
cJSON* pJson = syncReconfigFinish2Json(pMsg);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------------
void syncReconfigFinishPrint(const SyncReconfigFinish* pMsg) {
char* serialized = syncReconfigFinish2Str(pMsg);
printf("syncReconfigFinishPrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncReconfigFinishPrint2(char* s, const SyncReconfigFinish* pMsg) {
char* serialized = syncReconfigFinish2Str(pMsg);
printf("syncReconfigFinishPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void syncReconfigFinishLog(const SyncReconfigFinish* pMsg) {
char* serialized = syncReconfigFinish2Str(pMsg);
sTrace("syncReconfigFinishLog | len:%lu | %s", strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg) {
if (gRaftDetailLog) {
char* serialized = syncReconfigFinish2Str(pMsg);
sTrace("syncReconfigFinishLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
}
\ No newline at end of file
...@@ -96,14 +96,14 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) { ...@@ -96,14 +96,14 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
char *syncCfg2Str(SSyncCfg *pSyncCfg) { char *syncCfg2Str(SSyncCfg *pSyncCfg) {
cJSON *pJson = syncCfg2Json(pSyncCfg); cJSON *pJson = syncCfg2Json(pSyncCfg);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
int32_t len = 512; int32_t len = 512;
char *s = taosMemoryMalloc(len); char * s = taosMemoryMalloc(len);
memset(s, 0, len); memset(s, 0, len);
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
...@@ -196,7 +196,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { ...@@ -196,7 +196,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char *raftCfg2Str(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) {
cJSON *pJson = raftCfg2Json(pRaftCfg); cJSON *pJson = raftCfg2Json(pRaftCfg);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
...@@ -262,7 +262,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { ...@@ -262,7 +262,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
} }
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
ASSERT(code == 0); ASSERT(code == 0);
......
...@@ -21,7 +21,8 @@ ...@@ -21,7 +21,8 @@
#include "syncUtil.h" #include "syncUtil.h"
#include "wal.h" #include "wal.h"
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId); static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
SyncSnapshotSend *pBeginMsg);
//---------------------------------- //----------------------------------
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
...@@ -341,6 +342,10 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from ...@@ -341,6 +342,10 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
pReceiver->fromId = fromId; pReceiver->fromId = fromId;
pReceiver->term = pSyncNode->pRaftStore->currentTerm; pReceiver->term = pSyncNode->pRaftStore->currentTerm;
pReceiver->privateTerm = 0; pReceiver->privateTerm = 0;
pReceiver->snapshot.data = NULL;
pReceiver->snapshot.lastApplyIndex = -1;
pReceiver->snapshot.lastApplyTerm = 0;
pReceiver->snapshot.lastConfigIndex = -1;
} else { } else {
sInfo("snapshotReceiverCreate cannot create receiver"); sInfo("snapshotReceiverCreate cannot create receiver");
...@@ -358,11 +363,16 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { ...@@ -358,11 +363,16 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; } bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
// begin receive snapshot msg (current term, seq begin) // begin receive snapshot msg (current term, seq begin)
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) { static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm,
SyncSnapshotSend *pBeginMsg) {
pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm; pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm;
pReceiver->privateTerm = privateTerm; pReceiver->privateTerm = privateTerm;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
pReceiver->fromId = fromId; pReceiver->fromId = pBeginMsg->srcId;
pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
ASSERT(pReceiver->pWriter == NULL); ASSERT(pReceiver->pWriter == NULL);
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter)); int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
...@@ -371,10 +381,10 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p ...@@ -371,10 +381,10 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver // if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again // if already start, force close, start again
void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SRaftId fromId) { void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTerm, SyncSnapshotSend *pBeginMsg) {
if (!snapshotReceiverIsStart(pReceiver)) { if (!snapshotReceiverIsStart(pReceiver)) {
// start // start
snapshotReceiverDoStart(pReceiver, privateTerm, fromId); snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
pReceiver->start = true; pReceiver->start = true;
} else { } else {
...@@ -388,7 +398,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTer ...@@ -388,7 +398,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTer
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
// start again // start again
snapshotReceiverDoStart(pReceiver, privateTerm, fromId); snapshotReceiverDoStart(pReceiver, privateTerm, pBeginMsg);
pReceiver->start = true; pReceiver->start = true;
} }
...@@ -449,6 +459,15 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { ...@@ -449,6 +459,15 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId); cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
cJSON_AddItemToObject(pRoot, "fromId", pFromId); cJSON_AddItemToObject(pRoot, "fromId", pFromId);
snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->snapshot.lastApplyIndex);
cJSON_AddStringToObject(pRoot, "snapshot.lastApplyIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->snapshot.lastApplyTerm);
cJSON_AddStringToObject(pRoot, "snapshot.lastApplyTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->snapshot.lastConfigIndex);
cJSON_AddStringToObject(pRoot, "snapshot.lastConfigIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term); snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term);
cJSON_AddStringToObject(pRoot, "term", u64buf); cJSON_AddStringToObject(pRoot, "term", u64buf);
...@@ -477,8 +496,9 @@ char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) ...@@ -477,8 +496,9 @@ char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event)
uint16_t port; uint16_t port;
syncUtilU642Addr(fromId.addr, host, sizeof(host), &port); syncUtilU642Addr(fromId.addr, host, sizeof(host), &port);
snprintf(s, len, "%s %p start:%d ack:%d term:%lu pterm:%lu %s:%d ", event, pReceiver, pReceiver->start, snprintf(s, len, "%s %p start:%d ack:%d term:%lu pterm:%lu from:%s:%d laindex:%ld laterm:%lu lcindex:%ld", event,
pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port); pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term, pReceiver->privateTerm, host, port,
pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex);
return s; return s;
} }
...@@ -495,7 +515,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -495,7 +515,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) { if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
// begin // begin
snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg->srcId); snapshotReceiverStart(pReceiver, pMsg->privateTerm, pMsg);
pReceiver->ack = pMsg->seq; pReceiver->ack = pMsg->seq;
needRsp = true; needRsp = true;
...@@ -519,42 +539,9 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -519,42 +539,9 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// update new config myIndex // update new config myIndex
SSyncCfg newSyncCfg = pMsg->lastConfig; SSyncCfg newSyncCfg = pMsg->lastConfig;
syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg); syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg);
bool IamInNew = syncNodeInConfig(pSyncNode, &newSyncCfg);
bool isDrop = false;
if (IamInNew) {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog),
"update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld", pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex);
syncNodeEventLog(pSyncNode, eventLog);
syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
} else {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog),
"do not update config by snapshot, not in new, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld",
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
syncNodeEventLog(pSyncNode, eventLog);
}
// change isStandBy to normal
if (!isDrop) {
char tmpbuf[512];
char *oldStr = syncCfg2SimpleStr(&oldSyncCfg);
char *newStr = syncCfg2SimpleStr(&newSyncCfg);
snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, index:%ld, %s --> %s",
oldSyncCfg.replicaNum, newSyncCfg.replicaNum, pMsg->lastConfigIndex, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { // do config change
syncNodeBecomeLeader(pSyncNode, tmpbuf); syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex);
} else {
syncNodeBecomeFollower(pSyncNode, tmpbuf);
}
}
} }
SSnapshot snapshot; SSnapshot snapshot;
......
...@@ -261,23 +261,29 @@ bool syncUtilIsData(tmsg_t msgType) { ...@@ -261,23 +261,29 @@ bool syncUtilIsData(tmsg_t msgType) {
#endif #endif
bool syncUtilUserPreCommit(tmsg_t msgType) { bool syncUtilUserPreCommit(tmsg_t msgType) {
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) { if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH &&
msgType != TDMT_SYNC_LEADER_TRANSFER) {
return true; return true;
} }
return false; return false;
} }
bool syncUtilUserCommit(tmsg_t msgType) { bool syncUtilUserCommit(tmsg_t msgType) {
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) { if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH &&
msgType != TDMT_SYNC_LEADER_TRANSFER) {
return true; return true;
} }
return false; return false;
} }
bool syncUtilUserRollback(tmsg_t msgType) { bool syncUtilUserRollback(tmsg_t msgType) {
if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_LEADER_TRANSFER) { if (msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_CONFIG_CHANGE && msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH &&
msgType != TDMT_SYNC_LEADER_TRANSFER) {
return true; return true;
} }
return false; return false;
} }
......
...@@ -49,6 +49,7 @@ add_executable(syncRaftLogTest "") ...@@ -49,6 +49,7 @@ add_executable(syncRaftLogTest "")
add_executable(syncRaftLogTest2 "") add_executable(syncRaftLogTest2 "")
add_executable(syncRaftLogTest3 "") add_executable(syncRaftLogTest3 "")
add_executable(syncLeaderTransferTest "") add_executable(syncLeaderTransferTest "")
add_executable(syncReconfigFinishTest "")
target_sources(syncTest target_sources(syncTest
...@@ -255,6 +256,10 @@ target_sources(syncLeaderTransferTest ...@@ -255,6 +256,10 @@ target_sources(syncLeaderTransferTest
PRIVATE PRIVATE
"syncLeaderTransferTest.cpp" "syncLeaderTransferTest.cpp"
) )
target_sources(syncReconfigFinishTest
PRIVATE
"syncReconfigFinishTest.cpp"
)
target_include_directories(syncTest target_include_directories(syncTest
...@@ -512,6 +517,11 @@ target_include_directories(syncLeaderTransferTest ...@@ -512,6 +517,11 @@ target_include_directories(syncLeaderTransferTest
"${TD_SOURCE_DIR}/include/libs/sync" "${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories(syncReconfigFinishTest
PUBLIC
"${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest target_link_libraries(syncTest
...@@ -718,6 +728,10 @@ target_link_libraries(syncLeaderTransferTest ...@@ -718,6 +728,10 @@ target_link_libraries(syncLeaderTransferTest
sync sync
gtest_main gtest_main
) )
target_link_libraries(syncReconfigFinishTest
sync
gtest_main
)
enable_testing() enable_testing()
......
...@@ -147,8 +147,8 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_ ...@@ -147,8 +147,8 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) { void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) {
sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu", sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%ld, code:%d, currentTerm:%lu, term:%lu", cbMeta.flag,
cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term);
} }
SSyncFSM* createFsm() { SSyncFSM* createFsm() {
......
...@@ -78,8 +78,8 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { ...@@ -78,8 +78,8 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) { void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) {
sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu", sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%ld, code:%d, currentTerm:%lu, term:%lu", cbMeta.flag,
cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term);
} }
SSyncFSM* createFsm() { SSyncFSM* createFsm() {
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
SSyncCfg* createSyncOldCfg() {
SSyncCfg* pCfg = (SSyncCfg*)taosMemoryMalloc(sizeof(SSyncCfg));
memset(pCfg, 0, sizeof(SSyncCfg));
pCfg->replicaNum = 3;
pCfg->myIndex = 1;
for (int i = 0; i < pCfg->replicaNum; ++i) {
((pCfg->nodeInfo)[i]).nodePort = i * 100;
snprintf(((pCfg->nodeInfo)[i]).nodeFqdn, sizeof(((pCfg->nodeInfo)[i]).nodeFqdn), "100.200.300.%d", i);
}
return pCfg;
}
SSyncCfg* createSyncNewCfg() {
SSyncCfg* pCfg = (SSyncCfg*)taosMemoryMalloc(sizeof(SSyncCfg));
memset(pCfg, 0, sizeof(SSyncCfg));
pCfg->replicaNum = 3;
pCfg->myIndex = 1;
for (int i = 0; i < pCfg->replicaNum; ++i) {
((pCfg->nodeInfo)[i]).nodePort = i * 100;
snprintf(((pCfg->nodeInfo)[i]).nodeFqdn, sizeof(((pCfg->nodeInfo)[i]).nodeFqdn), "500.600.700.%d", i);
}
return pCfg;
}
SyncReconfigFinish *createMsg() {
SyncReconfigFinish *pMsg = syncReconfigFinishBuild(1234);
SSyncCfg* pOld = createSyncOldCfg();
SSyncCfg* pNew = createSyncNewCfg();
pMsg->oldCfg = *pOld;
pMsg->newCfg = *pNew;
pMsg->newCfgIndex = 11;
pMsg->newCfgTerm = 22;
pMsg->newCfgSeqNum = 33;
taosMemoryFree(pOld);
taosMemoryFree(pNew);
return pMsg;
}
void test1() {
SyncReconfigFinish *pMsg = createMsg();
syncReconfigFinishLog2((char *)"test1:", pMsg);
syncReconfigFinishDestroy(pMsg);
}
void test2() {
SyncReconfigFinish *pMsg = createMsg();
uint32_t len = pMsg->bytes;
char * serialized = (char *)taosMemoryMalloc(len);
syncReconfigFinishSerialize(pMsg, serialized, len);
SyncReconfigFinish *pMsg2 = syncReconfigFinishBuild(1000);
syncReconfigFinishDeserialize(serialized, len, pMsg2);
syncReconfigFinishLog2((char *)"test2: syncReconfigFinishSerialize -> syncReconfigFinishDeserialize ", pMsg2);
taosMemoryFree(serialized);
syncReconfigFinishDestroy(pMsg);
syncReconfigFinishDestroy(pMsg2);
}
void test3() {
SyncReconfigFinish *pMsg = createMsg();
uint32_t len;
char * serialized = syncReconfigFinishSerialize2(pMsg, &len);
SyncReconfigFinish *pMsg2 = syncReconfigFinishDeserialize2(serialized, len);
syncReconfigFinishLog2((char *)"test3: SyncReconfigFinishSerialize2 -> syncReconfigFinishDeserialize2 ", pMsg2);
taosMemoryFree(serialized);
syncReconfigFinishDestroy(pMsg);
syncReconfigFinishDestroy(pMsg2);
}
void test4() {
SyncReconfigFinish *pMsg = createMsg();
SRpcMsg rpcMsg;
syncReconfigFinish2RpcMsg(pMsg, &rpcMsg);
SyncReconfigFinish *pMsg2 = (SyncReconfigFinish *)taosMemoryMalloc(rpcMsg.contLen);
syncReconfigFinishFromRpcMsg(&rpcMsg, pMsg2);
syncReconfigFinishLog2((char *)"test4: syncReconfigFinish2RpcMsg -> syncReconfigFinishFromRpcMsg ", pMsg2);
rpcFreeCont(rpcMsg.pCont);
syncReconfigFinishDestroy(pMsg);
syncReconfigFinishDestroy(pMsg2);
}
void test5() {
SyncReconfigFinish *pMsg = createMsg();
SRpcMsg rpcMsg;
syncReconfigFinish2RpcMsg(pMsg, &rpcMsg);
SyncReconfigFinish *pMsg2 = syncReconfigFinishFromRpcMsg2(&rpcMsg);
syncReconfigFinishLog2((char *)"test5: syncReconfigFinish2RpcMsg -> syncReconfigFinishFromRpcMsg2 ", pMsg2);
rpcFreeCont(rpcMsg.pCont);
syncReconfigFinishDestroy(pMsg);
syncReconfigFinishDestroy(pMsg2);
}
int main() {
gRaftDetailLog = true;
tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
logTest();
test1();
test2();
test3();
test4();
test5();
return 0;
}
...@@ -148,8 +148,8 @@ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFini ...@@ -148,8 +148,8 @@ void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFini
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) { void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta) {
char* s = syncCfg2Str(&(cbMeta.newCfg)); char* s = syncCfg2Str(&(cbMeta.newCfg));
sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu, newCfg:%s", sTrace("==callback== ==ReConfigCb== flag:0x%lX, index:%ld, code:%d, currentTerm:%lu, term:%lu, newCfg:%s",
cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term, s); cbMeta.flag, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term, s);
taosMemoryFree(s); taosMemoryFree(s);
} }
......
...@@ -423,6 +423,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_IS_LEADER, "Sync is leader") ...@@ -423,6 +423,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_IS_LEADER, "Sync is leader")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_ONE_REPLICA, "Sync one replica") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_ONE_REPLICA, "Sync one replica")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_IN_NEW_CONFIG, "Sync not in new config") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_IN_NEW_CONFIG, "Sync not in new config")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEW_CONFIG_ERROR, "Sync new config error")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RECONFIG_NOT_READY, "Sync not ready for reconfig")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready for propose")
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
// wal // wal
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册