未验证 提交 a5e9308f 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #10943 from taosdata/feature/3.0_mhli

Feature/3.0 mhli
...@@ -35,6 +35,7 @@ typedef enum { ...@@ -35,6 +35,7 @@ typedef enum {
TAOS_SYNC_STATE_FOLLOWER = 100, TAOS_SYNC_STATE_FOLLOWER = 100,
TAOS_SYNC_STATE_CANDIDATE = 101, TAOS_SYNC_STATE_CANDIDATE = 101,
TAOS_SYNC_STATE_LEADER = 102, TAOS_SYNC_STATE_LEADER = 102,
TAOS_SYNC_STATE_ERROR = 103,
} ESyncState; } ESyncState;
typedef struct SSyncBuffer { typedef struct SSyncBuffer {
...@@ -68,17 +69,20 @@ typedef struct SSnapshot { ...@@ -68,17 +69,20 @@ typedef struct SSnapshot {
typedef struct SSyncFSM { typedef struct SSyncFSM {
void* data; void* data;
// when value in pBuf finish a raft flow, FpCommitCb is called, code indicates the result // when value in pMsg finish a raft flow, FpCommitCb is called, code indicates the result
// user can do something according to the code and isWeak. for example, write data into tsdb // user can do something according to the code and isWeak. for example, write data into tsdb
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state);
// when value in pBuf has been written into local log store, FpPreCommitCb is called, code indicates the result // when value in pMsg has been written into local log store, FpPreCommitCb is called, code indicates the result
// user can do something according to the code and isWeak. for example, write data into tsdb // user can do something according to the code and isWeak. for example, write data into tsdb
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state);
// when log entry is updated by a new one, FpRollBackCb is called // when log entry is updated by a new one, FpRollBackCb is called
// user can do something to roll back. for example, delete data from tsdb, or just ignore it // user can do something to roll back. for example, delete data from tsdb, or just ignore it
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state);
// user should implement this function, use "data" to take snapshot into "snapshot" // user should implement this function, use "data" to take snapshot into "snapshot"
int32_t (*FpTakeSnapshot)(SSnapshot* snapshot); int32_t (*FpTakeSnapshot)(SSnapshot* snapshot);
...@@ -157,10 +161,14 @@ void syncCleanUp(); ...@@ -157,10 +161,14 @@ void syncCleanUp();
int64_t syncStart(const SSyncInfo* pSyncInfo); int64_t syncStart(const SSyncInfo* pSyncInfo);
void syncStop(int64_t rid); void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // use this function int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); // just for compatibility
ESyncState syncGetMyRole(int64_t rid); ESyncState syncGetMyRole(int64_t rid);
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole);
// propose with sequence number, to implement linearizable semantics
int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum);
// for compatibility, the same as syncPropose
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
extern int32_t sDebugFlag; extern int32_t sDebugFlag;
......
...@@ -47,6 +47,8 @@ SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); ...@@ -47,6 +47,8 @@ SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore);
cJSON* logStore2Json(SSyncLogStore* pLogStore); cJSON* logStore2Json(SSyncLogStore* pLogStore);
char* logStore2Str(SSyncLogStore* pLogStore); char* logStore2Str(SSyncLogStore* pLogStore);
cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore);
char* logStoreSimple2Str(SSyncLogStore* pLogStore);
// for debug // for debug
void logStorePrint(SSyncLogStore* pLogStore); void logStorePrint(SSyncLogStore* pLogStore);
...@@ -54,6 +56,11 @@ void logStorePrint2(char* s, SSyncLogStore* pLogStore); ...@@ -54,6 +56,11 @@ void logStorePrint2(char* s, SSyncLogStore* pLogStore);
void logStoreLog(SSyncLogStore* pLogStore); void logStoreLog(SSyncLogStore* pLogStore);
void logStoreLog2(char* s, SSyncLogStore* pLogStore); void logStoreLog2(char* s, SSyncLogStore* pLogStore);
void logStoreSimplePrint(SSyncLogStore* pLogStore);
void logStoreSimplePrint2(char* s, SSyncLogStore* pLogStore);
void logStoreSimpleLog(SSyncLogStore* pLogStore);
void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -120,6 +120,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -120,6 +120,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// reject request // reject request
if ((pMsg->term < ths->pRaftStore->currentTerm) || if ((pMsg->term < ths->pRaftStore->currentTerm) ||
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) { ((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
sTrace(
"syncNodeOnAppendEntriesCb --> reject, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
"logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(); SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
pReply->srcId = ths->myRaftId; pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId; pReply->destId = pMsg->srcId;
...@@ -137,6 +142,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -137,6 +142,11 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// return to follower state // return to follower state
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) { if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
sTrace(
"syncNodeOnAppendEntriesCb --> return to follower, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, "
"ths->state:%d, logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
syncNodeBecomeFollower(ths); syncNodeBecomeFollower(ths);
// ret or reply? // ret or reply?
...@@ -145,29 +155,22 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -145,29 +155,22 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// accept request // accept request
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
bool preMatch = false; // preIndex = -1, or has preIndex entry in local log
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID &&
ths->pLogStore->getLastIndex(ths->pLogStore) == SYNC_INDEX_INVALID) {
preMatch = true;
}
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
SSyncRaftEntry* pPreEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex);
assert(pPreEntry != NULL);
if (pMsg->prevLogTerm == pPreEntry->term) {
preMatch = true;
}
syncEntryDestory(pPreEntry);
}
if (preMatch) {
// must has preIndex in local log
assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
// has extra entries (> preIndex) in local log
bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore); bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore);
// has entries in SyncAppendEntries msg
bool hasAppendEntries = pMsg->dataLen > 0; bool hasAppendEntries = pMsg->dataLen > 0;
sTrace(
"syncNodeOnAppendEntriesCb --> accept, pMsg->term:%lu, ths->pRaftStore->currentTerm:%lu, ths->state:%d, "
"logOK:%d, hasExtraEntries:%d, hasAppendEntries:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries);
if (hasExtraEntries && hasAppendEntries) { if (hasExtraEntries && hasAppendEntries) {
// conflict // not conflict by default
bool conflict = false; bool conflict = false;
SyncIndex extraIndex = pMsg->prevLogIndex + 1; SyncIndex extraIndex = pMsg->prevLogIndex + 1;
...@@ -177,8 +180,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -177,8 +180,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL); assert(pAppendEntry != NULL);
// log not match, conflict
assert(extraIndex == pAppendEntry->index); assert(extraIndex == pAppendEntry->index);
if (pExtraEntry->term == pAppendEntry->term) { if (pExtraEntry->term != pAppendEntry->term) {
conflict = true; conflict = true;
} }
...@@ -187,6 +191,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -187,6 +191,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore); SyncIndex delBegin = ths->pLogStore->getLastIndex(ths->pLogStore);
SyncIndex delEnd = extraIndex; SyncIndex delEnd = extraIndex;
sTrace("syncNodeOnAppendEntriesCb --> conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);
// notice! reverse roll back! // notice! reverse roll back!
for (SyncIndex index = delEnd; index >= delBegin; --index) { for (SyncIndex index = delEnd; index >= delBegin; --index) {
if (ths->pFsm->FpRollBackCb != NULL) { if (ths->pFsm->FpRollBackCb != NULL) {
...@@ -195,7 +201,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -195,7 +201,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0); ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, pRollBackEntry->index, pRollBackEntry->isWeak, 0, ths->state);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pRollBackEntry); syncEntryDestory(pRollBackEntry);
} }
...@@ -212,7 +218,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -212,7 +218,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0); ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 2, ths->state);
} }
} }
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
...@@ -237,7 +243,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -237,7 +243,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 0); ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pAppendEntry->index, pAppendEntry->isWeak, 3, ths->state);
} }
} }
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
...@@ -267,23 +273,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -267,23 +273,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply); syncAppendEntriesReplyDestroy(pReply);
} else {
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild();
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID;
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
}
// maybe update commit index from leader // maybe update commit index from leader
if (pMsg->commitIndex > ths->commitIndex) { if (pMsg->commitIndex > ths->commitIndex) {
// has commit entry in local // has commit entry in local
...@@ -308,7 +299,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -308,7 +299,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm->FpCommitCb != NULL) { if (ths->pFsm->FpCommitCb != NULL) {
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state);
} }
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
......
...@@ -48,6 +48,9 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p ...@@ -48,6 +48,9 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
return ret; return ret;
} }
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pNextIndex", ths->pNextIndex);
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pMatchIndex", ths->pMatchIndex);
// no need this code, because if I receive reply.term, then I must have sent for that term. // no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) { // if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term); // syncNodeUpdateTerm(ths, pMsg->term);
...@@ -77,5 +80,8 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p ...@@ -77,5 +80,8 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex); syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
} }
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pNextIndex", ths->pNextIndex);
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pMatchIndex", ths->pMatchIndex);
return ret; return ret;
} }
...@@ -63,7 +63,14 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -63,7 +63,14 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
if (pEntry->term == pSyncNode->pRaftStore->currentTerm) { if (pEntry->term == pSyncNode->pRaftStore->currentTerm) {
// update commit index // update commit index
newCommitIndex = index; newCommitIndex = index;
sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld",
newCommitIndex, pSyncNode->commitIndex);
break; break;
} else {
sTrace(
"syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%lu, "
"pSyncNode->pRaftStore->currentTerm:%lu",
pEntry->term, pSyncNode->pRaftStore->currentTerm);
} }
} }
} }
...@@ -91,7 +98,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -91,7 +98,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (pSyncNode->pFsm->FpCommitCb != NULL) { if (pSyncNode->pFsm->FpCommitCb != NULL) {
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, pSyncNode->state);
} }
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
......
...@@ -257,13 +257,6 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -257,13 +257,6 @@ static void *syncIOConsumerFunc(void *param) {
assert(pSyncMsg != NULL); assert(pSyncMsg != NULL);
io->FpOnSyncPing(io->pSyncNode, pSyncMsg); io->FpOnSyncPing(io->pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg); syncPingDestroy(pSyncMsg);
/*
pSyncMsg = syncPingBuild(pRpcMsg->contLen);
syncPingFromRpcMsg(pRpcMsg, pSyncMsg);
// memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen);
io->FpOnSyncPing(io->pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
*/
} }
} else if (pRpcMsg->msgType == SYNC_PING_REPLY) { } else if (pRpcMsg->msgType == SYNC_PING_REPLY) {
......
...@@ -104,9 +104,26 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { ...@@ -104,9 +104,26 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) {
} }
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0; int32_t ret = syncPropose2(rid, pMsg, isWeak, 0);
return ret;
}
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = syncPropose(rid, pMsg, isWeak);
return ret;
}
// todo : get pointer from rid ESyncState syncGetMyRole(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR;
}
assert(rid == pSyncNode->rid);
return pSyncNode->state;
}
int32_t syncPropose2(int64_t rid, const SRpcMsg* pMsg, bool isWeak, uint64_t seqNum) {
int32_t ret = 0;
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
return -1; return -1;
...@@ -114,7 +131,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ...@@ -114,7 +131,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
assert(rid == pSyncNode->rid); assert(rid == pSyncNode->rid);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak); SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, seqNum, isWeak);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
...@@ -122,7 +139,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ...@@ -122,7 +139,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
ret = 0; ret = 0;
} else { } else {
sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state)); sTrace("syncPropose not leader, %s", syncUtilState2String(pSyncNode->state));
ret = -1; // todo : need define err code !! ret = -1; // todo : need define err code !!
} }
...@@ -130,19 +147,6 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ...@@ -130,19 +147,6 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
return ret; return ret;
} }
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = syncPropose(rid, pMsg, isWeak);
return ret;
}
ESyncState syncGetMyRole(int64_t rid) {
// todo : get pointer from rid
SSyncNode* pSyncNode = NULL;
return pSyncNode->state;
}
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {}
// open/close -------------- // open/close --------------
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode)); SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode));
...@@ -848,7 +852,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg ...@@ -848,7 +852,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0); ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 0, ths->state);
} }
} }
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
...@@ -863,7 +867,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg ...@@ -863,7 +867,7 @@ static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL) { if (ths->pFsm->FpPreCommitCb != NULL) {
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, -2); ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, pEntry->index, pEntry->isWeak, 1, ths->state);
} }
} }
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
......
...@@ -834,7 +834,7 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) { ...@@ -834,7 +834,7 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) {
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf); cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogIndex); snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastLogIndex);
cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf); cJSON_AddStringToObject(pRoot, "lastLogIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogTerm); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->lastLogTerm);
cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf); cJSON_AddStringToObject(pRoot, "lastLogTerm", u64buf);
...@@ -1127,14 +1127,14 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) { ...@@ -1127,14 +1127,14 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) {
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf); cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogIndex); snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->prevLogIndex);
cJSON_AddStringToObject(pRoot, "pre_log_index", u64buf); cJSON_AddStringToObject(pRoot, "prevLogIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogTerm); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogTerm);
cJSON_AddStringToObject(pRoot, "pre_log_term", u64buf); cJSON_AddStringToObject(pRoot, "pre_log_term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->commitIndex); snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->commitIndex);
cJSON_AddStringToObject(pRoot, "commit_index", u64buf); cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
char* s; char* s;
...@@ -1288,7 +1288,7 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) { ...@@ -1288,7 +1288,7 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
cJSON_AddStringToObject(pRoot, "term", u64buf); cJSON_AddStringToObject(pRoot, "term", u64buf);
cJSON_AddNumberToObject(pRoot, "success", pMsg->success); cJSON_AddNumberToObject(pRoot, "success", pMsg->success);
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->matchIndex); snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->matchIndex);
cJSON_AddStringToObject(pRoot, "matchIndex", u64buf); cJSON_AddStringToObject(pRoot, "matchIndex", u64buf);
} }
......
...@@ -165,6 +165,34 @@ char* logStore2Str(SSyncLogStore* pLogStore) { ...@@ -165,6 +165,34 @@ char* logStore2Str(SSyncLogStore* pLogStore) {
return serialized; return serialized;
} }
cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) {
char u64buf[128];
SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
cJSON* pRoot = cJSON_CreateObject();
if (pData != NULL && pData->pWal != NULL) {
snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
cJSON_AddStringToObject(pRoot, "pWal", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SSyncLogStoreSimple", pRoot);
return pJson;
}
char* logStoreSimple2Str(SSyncLogStore* pLogStore) {
cJSON* pJson = logStoreSimple2Json(pLogStore);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug ----------------- // for debug -----------------
void logStorePrint(SSyncLogStore* pLogStore) { void logStorePrint(SSyncLogStore* pLogStore) {
char* serialized = logStore2Str(pLogStore); char* serialized = logStore2Str(pLogStore);
...@@ -191,3 +219,30 @@ void logStoreLog2(char* s, SSyncLogStore* pLogStore) { ...@@ -191,3 +219,30 @@ void logStoreLog2(char* s, SSyncLogStore* pLogStore) {
sTrace("logStorePrint | len:%lu | %s | %s", strlen(serialized), s, serialized); sTrace("logStorePrint | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized); free(serialized);
} }
// for debug -----------------
void logStoreSimplePrint(SSyncLogStore* pLogStore) {
char* serialized = logStoreSimple2Str(pLogStore);
printf("logStoreSimplePrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void logStoreSimplePrint2(char* s, SSyncLogStore* pLogStore) {
char* serialized = logStoreSimple2Str(pLogStore);
printf("logStoreSimplePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void logStoreSimpleLog(SSyncLogStore* pLogStore) {
char* serialized = logStoreSimple2Str(pLogStore);
sTrace("logStoreSimpleLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) {
char* serialized = logStoreSimple2Str(pLogStore);
sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
\ No newline at end of file
...@@ -49,6 +49,10 @@ ...@@ -49,6 +49,10 @@
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER); assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pNextIndex", pSyncNode->pNextIndex);
syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pMatchIndex", pSyncNode->pMatchIndex);
logStoreSimpleLog2("==syncNodeAppendEntriesPeers==", pSyncNode->pLogStore);
int32_t ret = 0; int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) { for (int i = 0; i < pSyncNode->peersNum; ++i) {
SRaftId* pDestId = &(pSyncNode->peersId[i]); SRaftId* pDestId = &(pSyncNode->peersId[i]);
...@@ -99,6 +103,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { ...@@ -99,6 +103,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
pMsg->prevLogTerm = preLogTerm; pMsg->prevLogTerm = preLogTerm;
pMsg->commitIndex = pSyncNode->commitIndex; pMsg->commitIndex = pSyncNode->commitIndex;
syncAppendEntriesLog2("==syncNodeAppendEntriesPeers==", pMsg);
// send AppendEntries // send AppendEntries
syncNodeAppendEntries(pSyncNode, pDestId, pMsg); syncNodeAppendEntries(pSyncNode, pDestId, pMsg);
syncAppendEntriesDestroy(pMsg); syncAppendEntriesDestroy(pMsg);
......
...@@ -29,10 +29,14 @@ add_executable(syncPingTimerTest2 "") ...@@ -29,10 +29,14 @@ add_executable(syncPingTimerTest2 "")
add_executable(syncPingSelfTest "") add_executable(syncPingSelfTest "")
add_executable(syncElectTest "") add_executable(syncElectTest "")
add_executable(syncElectTest2 "") add_executable(syncElectTest2 "")
add_executable(syncElectTest3 "")
add_executable(syncEncodeTest "") add_executable(syncEncodeTest "")
add_executable(syncWriteTest "") add_executable(syncWriteTest "")
add_executable(syncReplicateTest "") add_executable(syncReplicateTest "")
add_executable(syncReplicateTest2 "")
add_executable(syncReplicateLoadTest "")
add_executable(syncRefTest "") add_executable(syncRefTest "")
add_executable(syncLogStoreCheck "")
target_sources(syncTest target_sources(syncTest
...@@ -159,6 +163,10 @@ target_sources(syncElectTest2 ...@@ -159,6 +163,10 @@ target_sources(syncElectTest2
PRIVATE PRIVATE
"syncElectTest2.cpp" "syncElectTest2.cpp"
) )
target_sources(syncElectTest3
PRIVATE
"syncElectTest3.cpp"
)
target_sources(syncEncodeTest target_sources(syncEncodeTest
PRIVATE PRIVATE
"syncEncodeTest.cpp" "syncEncodeTest.cpp"
...@@ -171,10 +179,22 @@ target_sources(syncReplicateTest ...@@ -171,10 +179,22 @@ target_sources(syncReplicateTest
PRIVATE PRIVATE
"syncReplicateTest.cpp" "syncReplicateTest.cpp"
) )
target_sources(syncReplicateTest2
PRIVATE
"syncReplicateTest2.cpp"
)
target_sources(syncReplicateLoadTest
PRIVATE
"syncReplicateLoadTest.cpp"
)
target_sources(syncRefTest target_sources(syncRefTest
PRIVATE PRIVATE
"syncRefTest.cpp" "syncRefTest.cpp"
) )
target_sources(syncLogStoreCheck
PRIVATE
"syncLogStoreCheck.cpp"
)
target_include_directories(syncTest target_include_directories(syncTest
...@@ -332,6 +352,11 @@ target_include_directories(syncElectTest2 ...@@ -332,6 +352,11 @@ target_include_directories(syncElectTest2
"${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories(syncElectTest3
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncEncodeTest target_include_directories(syncEncodeTest
PUBLIC PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_SOURCE_DIR}/include/libs/sync"
...@@ -347,11 +372,26 @@ target_include_directories(syncReplicateTest ...@@ -347,11 +372,26 @@ target_include_directories(syncReplicateTest
"${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories(syncReplicateTest2
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncReplicateLoadTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncRefTest target_include_directories(syncRefTest
PUBLIC PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories(syncLogStoreCheck
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest target_link_libraries(syncTest
...@@ -478,6 +518,10 @@ target_link_libraries(syncElectTest2 ...@@ -478,6 +518,10 @@ target_link_libraries(syncElectTest2
sync sync
gtest_main gtest_main
) )
target_link_libraries(syncElectTest3
sync
gtest_main
)
target_link_libraries(syncEncodeTest target_link_libraries(syncEncodeTest
sync sync
gtest_main gtest_main
...@@ -490,10 +534,22 @@ target_link_libraries(syncReplicateTest ...@@ -490,10 +534,22 @@ target_link_libraries(syncReplicateTest
sync sync
gtest_main gtest_main
) )
target_link_libraries(syncReplicateTest2
sync
gtest_main
)
target_link_libraries(syncReplicateLoadTest
sync
gtest_main
)
target_link_libraries(syncRefTest target_link_libraries(syncRefTest
sync sync
gtest_main gtest_main
) )
target_link_libraries(syncLogStoreCheck
sync
gtest_main
)
enable_testing() enable_testing()
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "tref.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 3;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
SSyncFSM* pFsm;
SWal* pWal;
int64_t syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "./elect3_test_%d", myIndex);
int code = walInit();
assert(code == 0);
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = syncInfo.vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
char tmpdir[128];
snprintf(tmpdir, sizeof(tmpdir), "./elect3_test_wal_%d", myIndex);
pWal = walOpen(tmpdir, &walCfg);
assert(pWal != NULL);
syncInfo.pWal = pWal;
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = ports[i];
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
int64_t rid = syncStart(&syncInfo);
assert(rid > 0);
SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
pSyncNode->hbBaseLine = 500;
pSyncNode->electBaseLine = 1500;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->pSyncNode = pSyncNode;
syncNodeRelease(pSyncNode);
return rid;
}
void initRaftId(SSyncNode* pSyncNode) {
for (int i = 0; i < replicaNum; ++i) {
ids[i] = pSyncNode->replicasId[i];
char* s = syncUtilRaftId2Str(&ids[i]);
printf("raftId[%d] : %s\n", i, s);
free(s);
}
}
int main(int argc, char** argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
}
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
ret = syncInit();
assert(ret == 0);
int64_t rid = syncNodeInit();
assert(rid > 0);
SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
syncNodePrint2((char*)"", pSyncNode);
initRaftId(pSyncNode);
//---------------------------
while (1) {
sTrace(
"elect sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, electTimerMS:%d",
pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS);
taosMsleep(1000);
}
syncNodeRelease(pSyncNode);
return 0;
}
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 1;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
SSyncFSM* pFsm;
SWal* pWal;
SSyncNode* pSyncNode;
SSyncNode* syncNodeInit(const char* path) {
syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./log_check");
int code = walInit();
assert(code == 0);
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = syncInfo.vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
pWal = walOpen(path, &walCfg);
assert(pWal != NULL);
syncInfo.pWal = pWal;
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = ports[i];
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->pSyncNode = pSyncNode;
return pSyncNode;
}
SSyncNode* logStoreCheck(const char* path) { return syncNodeInit(path); }
int main(int argc, char** argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
}
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
ret = syncEnvStart();
assert(ret == 0);
pSyncNode = logStoreCheck(argv[1]);
assert(pSyncNode != NULL);
logStorePrint2((char*)"logStoreCheck", pSyncNode->pLogStore);
return 0;
}
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 3;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
SSyncFSM *pFsm;
SWal * pWal;
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak,
code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void initFsm() {
pFsm = (SSyncFSM *)malloc(sizeof(SSyncFSM));
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
}
int64_t syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "./replicate2_test_%d", myIndex);
int code = walInit();
assert(code == 0);
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = syncInfo.vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
char tmpdir[128];
snprintf(tmpdir, sizeof(tmpdir), "./replicate2_test_wal_%d", myIndex);
pWal = walOpen(tmpdir, &walCfg);
assert(pWal != NULL);
syncInfo.pWal = pWal;
SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = ports[i];
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
int64_t rid = syncStart(&syncInfo);
assert(rid > 0);
SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
// pSyncNode->hbBaseLine = 500;
// pSyncNode->electBaseLine = 1500;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
gSyncIO->pSyncNode = pSyncNode;
syncNodeRelease(pSyncNode);
return rid;
}
void initRaftId(SSyncNode *pSyncNode) {
for (int i = 0; i < replicaNum; ++i) {
ids[i] = pSyncNode->replicasId[i];
char *s = syncUtilRaftId2Str(&ids[i]);
printf("raftId[%d] : %s\n", i, s);
free(s);
}
}
SRpcMsg *step0(int i) {
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg));
memset(pMsg, 0, sizeof(SRpcMsg));
pMsg->msgType = 9999;
pMsg->contLen = 128;
pMsg->pCont = malloc(pMsg->contLen);
snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-%u-%d", ports[myIndex], i);
return pMsg;
}
SyncClientRequest *step1(const SRpcMsg *pMsg) {
SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true);
return pRetMsg;
}
int main(int argc, char **argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
void logTest();
myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
}
int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
initFsm();
ret = syncInit();
assert(ret == 0);
int64_t rid = syncNodeInit();
assert(rid > 0);
SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
syncNodePrint2((char *)"", pSyncNode);
initRaftId(pSyncNode);
// only load ...
while (1) {
sTrace(
"replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
"electTimerMS:%d",
pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS);
taosMsleep(1000);
}
return 0;
}
...@@ -28,19 +28,29 @@ SSyncFSM * pFsm; ...@@ -28,19 +28,29 @@ SSyncFSM * pFsm;
SWal * pWal; SWal * pWal;
SSyncNode *gSyncNode; SSyncNode *gSyncNode;
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
printf("==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); ESyncState state) {
syncRpcMsgPrint2((char *)"==CommitCb==", (SRpcMsg *)pBuf); char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
} }
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
printf("==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); ESyncState state) {
syncRpcMsgPrint2((char *)"==PreCommitCb==", (SRpcMsg *)pBuf); char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak,
code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
} }
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
printf("==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); ESyncState state) {
syncRpcMsgPrint2((char *)"==RollBackCb==", (SRpcMsg *)pBuf); char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
} }
void initFsm() { void initFsm() {
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 3;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
SSyncFSM *pFsm;
SWal * pWal;
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak,
code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
ESyncState state) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
}
void initFsm() {
pFsm = (SSyncFSM *)malloc(sizeof(SSyncFSM));
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
}
int64_t syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "./replicate2_test_%d", myIndex);
int code = walInit();
assert(code == 0);
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = syncInfo.vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
char tmpdir[128];
snprintf(tmpdir, sizeof(tmpdir), "./replicate2_test_wal_%d", myIndex);
pWal = walOpen(tmpdir, &walCfg);
assert(pWal != NULL);
syncInfo.pWal = pWal;
SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = ports[i];
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
int64_t rid = syncStart(&syncInfo);
assert(rid > 0);
SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
// pSyncNode->hbBaseLine = 500;
// pSyncNode->electBaseLine = 1500;
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
gSyncIO->pSyncNode = pSyncNode;
syncNodeRelease(pSyncNode);
return rid;
}
void initRaftId(SSyncNode *pSyncNode) {
for (int i = 0; i < replicaNum; ++i) {
ids[i] = pSyncNode->replicasId[i];
char *s = syncUtilRaftId2Str(&ids[i]);
printf("raftId[%d] : %s\n", i, s);
free(s);
}
}
SRpcMsg *step0(int i) {
SRpcMsg *pMsg = (SRpcMsg *)malloc(sizeof(SRpcMsg));
memset(pMsg, 0, sizeof(SRpcMsg));
pMsg->msgType = 9999;
pMsg->contLen = 128;
pMsg->pCont = malloc(pMsg->contLen);
snprintf((char *)(pMsg->pCont), pMsg->contLen, "value-%u-%d", ports[myIndex], i);
return pMsg;
}
SyncClientRequest *step1(const SRpcMsg *pMsg) {
SyncClientRequest *pRetMsg = syncClientRequestBuild2(pMsg, 123, true);
return pRetMsg;
}
int main(int argc, char **argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
void logTest();
myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
}
int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
initFsm();
ret = syncInit();
assert(ret == 0);
int64_t rid = syncNodeInit();
assert(rid > 0);
SSyncNode *pSyncNode = (SSyncNode *)syncNodeAcquire(rid);
assert(pSyncNode != NULL);
syncNodePrint2((char *)"", pSyncNode);
initRaftId(pSyncNode);
for (int i = 0; i < 30; ++i) {
// step0
SRpcMsg *pMsg0 = step0(i);
syncRpcMsgPrint2((char *)"==step0==", pMsg0);
syncPropose(rid, pMsg0, true);
taosMsleep(1000);
free(pMsg0);
sTrace(
"syncPropose sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
"electTimerMS:%d",
pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS);
}
while (1) {
sTrace(
"replicate sleep, state: %d, %s, term:%lu electTimerLogicClock:%lu, electTimerLogicClockUser:%lu, "
"electTimerMS:%d",
pSyncNode->state, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm,
pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS);
taosMsleep(1000);
}
return 0;
}
...@@ -28,19 +28,29 @@ SSyncFSM * pFsm; ...@@ -28,19 +28,29 @@ SSyncFSM * pFsm;
SWal * pWal; SWal * pWal;
SSyncNode *gSyncNode; SSyncNode *gSyncNode;
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
printf("==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); ESyncState state) {
syncRpcMsgPrint2((char *)"==CommitCb==", (SRpcMsg *)pBuf); char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
} }
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
printf("==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); ESyncState state) {
syncRpcMsgPrint2((char *)"==PreCommitCb==", (SRpcMsg *)pBuf); char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, index, isWeak,
code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
} }
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pBuf, SyncIndex index, bool isWeak, int32_t code) { void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SyncIndex index, bool isWeak, int32_t code,
printf("==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d \n", pFsm, index, isWeak, code); ESyncState state) {
syncRpcMsgPrint2((char *)"==RollBackCb==", (SRpcMsg *)pBuf); char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, index, isWeak, code, state, syncUtilState2String(state));
syncRpcMsgPrint2(logBuf, (SRpcMsg *)pMsg);
} }
void initFsm() { void initFsm() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册