未验证 提交 4f37eb50 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17825 from taosdata/fix/TD-20052

refact: adjust sync.h and syncState
...@@ -25,8 +25,6 @@ extern "C" { ...@@ -25,8 +25,6 @@ extern "C" {
#include "tlrucache.h" #include "tlrucache.h"
#include "tmsgcb.h" #include "tmsgcb.h"
extern bool gRaftDetailLog;
#define SYNC_RESP_TTL_MS 10000000 #define SYNC_RESP_TTL_MS 10000000
#define SYNC_SPEED_UP_HB_TIMER 400 #define SYNC_SPEED_UP_HB_TIMER 400
#define SYNC_SPEED_UP_AFTER_MS (1000 * 20) #define SYNC_SPEED_UP_AFTER_MS (1000 * 20)
...@@ -132,7 +130,7 @@ typedef struct SSnapshotMeta { ...@@ -132,7 +130,7 @@ typedef struct SSnapshotMeta {
typedef struct SSyncFSM { typedef struct SSyncFSM {
void* data; void* data;
void (*FpCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta *pMeta); void (*FpCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpPreCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpPreCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpRollBackCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpRollBackCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
...@@ -202,37 +200,27 @@ typedef struct SSyncInfo { ...@@ -202,37 +200,27 @@ typedef struct SSyncInfo {
int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
} SSyncInfo; } SSyncInfo;
int32_t syncInit(); typedef struct SSyncState {
void syncCleanUp(); ESyncState state;
bool syncIsInit(); bool restored;
int64_t syncOpen(SSyncInfo* pSyncInfo); } SSyncState;
void syncStart(int64_t rid);
void syncStop(int64_t rid); int32_t syncInit();
ESyncState syncGetMyRole(int64_t rid); void syncCleanUp();
bool syncIsReady(int64_t rid); int64_t syncOpen(SSyncInfo* pSyncInfo);
const char* syncGetMyRoleStr(int64_t rid); void syncStart(int64_t rid);
bool syncRestoreFinish(int64_t rid); void syncStop(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
SyncIndex syncGetLastIndex(int64_t rid); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
SyncIndex syncGetCommitIndex(int64_t rid);
SyncGroupId syncGetVgId(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
// int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize);
const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid);
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot);
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
int32_t syncLeaderTransfer(int64_t rid);
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
int32_t syncEndSnapshot(int64_t rid); int32_t syncEndSnapshot(int64_t rid);
int32_t syncLeaderTransfer(int64_t rid);
int32_t syncStepDown(int64_t rid, SyncTerm newTerm); int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); SSyncState syncGetState(int64_t rid);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
const char* syncUtilState2String(ESyncState state); const char* syncStr(ESyncState state);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -487,14 +487,14 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { ...@@ -487,14 +487,14 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
} }
if (mndAcquireRpc(pMsg->info.node) == 0) return 0; if (mndAcquireRpc(pMsg->info.node) == 0) return 0;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
const char *role = syncGetMyRoleStr(pMnode->syncMgmt.sync); SSyncState state = syncGetState(pMnode->syncMgmt.sync);
bool restored = syncIsRestoreFinish(pMnode->syncMgmt.sync);
if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER ||
pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER || pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER ||
pMsg->msgType == TDMT_MND_UPTIME_TIMER) { pMsg->msgType == TDMT_MND_UPTIME_TIMER) {
mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored, mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored,
pMnode->stopped, restored, role); pMnode->stopped, state.restored, syncStr(state.restored));
return -1; return -1;
} }
...@@ -505,8 +505,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { ...@@ -505,8 +505,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
mDebug( mDebug(
"msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d " "msg:%p, type:%s failed to process since %s, mnode restored:%d stopped:%d, sync restored:%d "
"role:%s, redirect numOfEps:%d inUse:%d", "role:%s, redirect numOfEps:%d inUse:%d",
pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, restored, role, epSet.numOfEps, pMsg, TMSG_INFO(pMsg->msgType), terrstr(), pMnode->restored, pMnode->stopped, state.restored,
epSet.inUse); syncStr(state.restored), epSet.numOfEps, epSet.inUse);
if (epSet.numOfEps > 0) { if (epSet.numOfEps > 0) {
for (int32_t i = 0; i < epSet.numOfEps; ++i) { for (int32_t i = 0; i < epSet.numOfEps; ++i) {
...@@ -729,8 +729,9 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr ...@@ -729,8 +729,9 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
} }
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) { int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync); SSyncState state = syncGetState(pMnode->syncMgmt.sync);
pLoad->syncRestore = pMnode->restored; pLoad->syncState = state.state;
pLoad->syncRestore = state.restored;
mTrace("mnode current syncState is %s, syncRestore:%d", syncStr(pLoad->syncState), pLoad->syncRestore); mTrace("mnode current syncState is %s, syncRestore:%d", syncStr(pLoad->syncState), pLoad->syncRestore);
return 0; return 0;
} }
......
...@@ -349,11 +349,15 @@ void mndSyncStop(SMnode *pMnode) { ...@@ -349,11 +349,15 @@ void mndSyncStop(SMnode *pMnode) {
} }
bool mndIsLeader(SMnode *pMnode) { bool mndIsLeader(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncState state = syncGetState(pMnode->syncMgmt.sync);
if (!syncIsReady(pMgmt->sync)) { if (state.state != TAOS_SYNC_STATE_LEADER || !state.restored) {
// get terrno from syncIsReady if (state.state != TAOS_SYNC_STATE_LEADER) {
// terrno = TSDB_CODE_SYN_NOT_LEADER; terrno = TSDB_CODE_SYN_NOT_LEADER;
} else {
terrno = TSDB_CODE_APP_NOT_READY;
}
mDebug("vgId:1, mnode not ready, state:%s, restore:%d", syncStr(state.state), state.restored);
return false; return false;
} }
......
...@@ -414,9 +414,11 @@ _exit: ...@@ -414,9 +414,11 @@ _exit:
} }
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
SSyncState state = syncGetState(pVnode->sync);
pLoad->vgId = TD_VID(pVnode); pLoad->vgId = TD_VID(pVnode);
pLoad->syncState = syncGetMyRole(pVnode->sync); pLoad->syncState = state.state;
pLoad->syncRestore = pVnode->restored; pLoad->syncRestore = state.restored;
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode); pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta); pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta);
......
...@@ -309,13 +309,13 @@ static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const S ...@@ -309,13 +309,13 @@ static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const S
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
", weak:%d, code:%d, state:%d %s, type:%s", ", weak:%d, code:%d, state:%d %s, type:%s",
syncGetVgId(pVnode->sync), pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak, pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak,
pMeta->code, pMeta->state, syncUtilState2String(pMeta->state), TMSG_INFO(pMsg->msgType)); pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType));
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else { } else {
SRpcMsg rsp = {.code = pMeta->code, .info = pMsg->info}; SRpcMsg rsp = {.code = pMeta->code, .info = pMsg->info};
vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync), vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", pVnode->config.vgId,
TMSG_INFO(pMsg->msgType), pMeta->index, pMeta->code, tstrerror(pMeta->code)); TMSG_INFO(pMsg->msgType), pMeta->index, pMeta->code, tstrerror(pMeta->code));
if (rsp.info.handle != NULL) { if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
...@@ -338,8 +338,8 @@ static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, con ...@@ -338,8 +338,8 @@ static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, con
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
syncGetVgId(pVnode->sync), pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state),
syncUtilState2String(pMeta->state), TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
} }
#define USE_TSDB_SNAPSHOT #define USE_TSDB_SNAPSHOT
...@@ -552,12 +552,21 @@ void vnodeSyncClose(SVnode *pVnode) { ...@@ -552,12 +552,21 @@ void vnodeSyncClose(SVnode *pVnode) {
syncStop(pVnode->sync); syncStop(pVnode->sync);
} }
bool vnodeIsRoleLeader(SVnode *pVnode) { return syncGetMyRole(pVnode->sync) == TAOS_SYNC_STATE_LEADER; } bool vnodeIsRoleLeader(SVnode *pVnode) {
SSyncState state = syncGetState(pVnode->sync);
return state.state == TAOS_SYNC_STATE_LEADER;
}
bool vnodeIsLeader(SVnode *pVnode) { bool vnodeIsLeader(SVnode *pVnode) {
if (!syncIsReady(pVnode->sync)) { SSyncState state = syncGetState(pVnode->sync);
vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync),
syncRestoreFinish(pVnode->sync)); if (state.state != TAOS_SYNC_STATE_LEADER || !state.restored) {
if (state.state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
} else {
terrno = TSDB_CODE_APP_NOT_READY;
}
vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncStr(state.state), state.restored);
return false; return false;
} }
......
...@@ -52,6 +52,7 @@ typedef struct SSyncEnv { ...@@ -52,6 +52,7 @@ typedef struct SSyncEnv {
} SSyncEnv; } SSyncEnv;
SSyncEnv* syncEnv(); SSyncEnv* syncEnv();
bool syncIsInit();
int64_t syncNodeAdd(SSyncNode* pNode); int64_t syncNodeAdd(SSyncNode* pNode);
void syncNodeRemove(int64_t rid); void syncNodeRemove(int64_t rid);
......
...@@ -22,9 +22,9 @@ extern "C" { ...@@ -22,9 +22,9 @@ extern "C" {
#include "sync.h" #include "sync.h"
#include "syncTools.h" #include "syncTools.h"
#include "tlog.h"
#include "ttimer.h"
#include "taosdef.h" #include "taosdef.h"
#include "tlog.h"
#include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
// clang-format off // clang-format off
......
...@@ -20,15 +20,13 @@ ...@@ -20,15 +20,13 @@
extern "C" { extern "C" {
#endif #endif
#include "trpc.h"
// ------------------ ds ------------------- // ------------------ ds -------------------
typedef struct SRaftId { typedef struct SRaftId {
SyncNodeId addr; SyncNodeId addr;
SyncGroupId vgId; SyncGroupId vgId;
} SRaftId; } SRaftId;
char* sync2SimpleStr(int64_t rid); char* sync2SimpleStr(int64_t rid);
// for compatibility, the same as syncPropose // for compatibility, the same as syncPropose
int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak); int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak);
......
...@@ -49,7 +49,7 @@ int32_t syncUtilQuorum(int32_t replicaNum); ...@@ -49,7 +49,7 @@ int32_t syncUtilQuorum(int32_t replicaNum);
cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p); cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p);
cJSON* syncUtilRaftId2Json(const SRaftId* p); cJSON* syncUtilRaftId2Json(const SRaftId* p);
char* syncUtilRaftId2Str(const SRaftId* p); char* syncUtilRaftId2Str(const SRaftId* p);
const char* syncUtilState2String(ESyncState state); const char* syncStr(ESyncState state);
bool syncUtilCanPrint(char c); bool syncUtilCanPrint(char c);
char* syncUtilprintBin(char* ptr, uint32_t len); char* syncUtilprintBin(char* ptr, uint32_t len);
char* syncUtilprintBin2(char* ptr, uint32_t len); char* syncUtilprintBin2(char* ptr, uint32_t len);
......
...@@ -505,50 +505,20 @@ int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak) { ...@@ -505,50 +505,20 @@ int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
return ret; return ret;
} }
ESyncState syncGetMyRole(int64_t rid) { SSyncState syncGetState(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid); SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR;
}
ASSERT(rid == pSyncNode->rid);
ESyncState state = pSyncNode->state;
syncNodeRelease(pSyncNode);
return state;
}
bool syncIsReady(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode != NULL) {
return false; state.state = pSyncNode->state;
} state.restored = pSyncNode->restoreFinish;
ASSERT(rid == pSyncNode->rid); syncNodeRelease(pSyncNode);
bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && pSyncNode->restoreFinish;
syncNodeRelease(pSyncNode);
// if false, set error code
if (false == b) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
} else {
terrno = TSDB_CODE_APP_NOT_READY;
}
}
return b;
}
bool syncIsRestoreFinish(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return false;
} }
ASSERT(rid == pSyncNode->rid);
bool b = pSyncNode->restoreFinish;
syncNodeRelease(pSyncNode); return state;
return b;
} }
#if 0
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) { int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
if (index < SYNC_INDEX_BEGIN) { if (index < SYNC_INDEX_BEGIN) {
return -1; return -1;
...@@ -618,6 +588,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct ...@@ -618,6 +588,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
return 0; return 0;
} }
#endif
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) { SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1); ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
...@@ -635,23 +606,7 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho ...@@ -635,23 +606,7 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho
return lastIndex; return lastIndex;
} }
const char* syncGetMyRoleStr(int64_t rid) { #if 0
const char* s = syncUtilState2String(syncGetMyRole(rid));
return s;
}
bool syncRestoreFinish(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return false;
}
ASSERT(rid == pSyncNode->rid);
bool restoreFinish = pSyncNode->restoreFinish;
syncNodeRelease(pSyncNode);
return restoreFinish;
}
SyncTerm syncGetMyTerm(int64_t rid) { SyncTerm syncGetMyTerm(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
...@@ -719,6 +674,7 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { ...@@ -719,6 +674,7 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
} }
#endif
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
SSyncNode* pSyncNode = syncNodeAcquire(rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
...@@ -742,7 +698,6 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { ...@@ -742,7 +698,6 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
} }
static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandleInfo* pInfo) { static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandleInfo* pInfo) {
SRespStub stub; SRespStub stub;
int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub); int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub);
...@@ -877,7 +832,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { ...@@ -877,7 +832,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
} else { } else {
ret = -1; ret = -1;
terrno = TSDB_CODE_SYN_NOT_LEADER; terrno = TSDB_CODE_SYN_NOT_LEADER;
sError("vgId:%d, sync propose not leader, %s, type:%s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), sError("vgId:%d, sync propose not leader, %s, type:%s", pSyncNode->vgId, syncStr(pSyncNode->state),
TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
goto _END; goto _END;
} }
...@@ -1603,7 +1558,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { ...@@ -1603,7 +1558,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
// tla+ server vars // tla+ server vars
cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state); cJSON_AddNumberToObject(pRoot, "state", pSyncNode->state);
cJSON_AddStringToObject(pRoot, "state_str", syncUtilState2String(pSyncNode->state)); cJSON_AddStringToObject(pRoot, "state_str", syncStr(pSyncNode->state));
cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(pSyncNode->pRaftStore)); cJSON_AddItemToObject(pRoot, "pRaftStore", raftStore2Json(pSyncNode->pRaftStore));
// tla+ candidate vars // tla+ candidate vars
...@@ -1743,7 +1698,7 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { ...@@ -1743,7 +1698,7 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
"stgy:%d, bch:%d, " "stgy:%d, bch:%d, "
"r-num:%d, " "r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->vgId, syncStr(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex, pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
...@@ -1767,7 +1722,7 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { ...@@ -1767,7 +1722,7 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
"stgy:%d, bch:%d, " "stgy:%d, bch:%d, "
"r-num:%d, " "r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->vgId, syncStr(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex, pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
...@@ -1821,7 +1776,7 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { ...@@ -1821,7 +1776,7 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
"stgy:%d, bch:%d, " "stgy:%d, bch:%d, "
"r-num:%d, " "r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->vgId, syncStr(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex, pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
...@@ -1843,7 +1798,7 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { ...@@ -1843,7 +1798,7 @@ inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) {
"stgy:%d, bch:%d, " "stgy:%d, bch:%d, "
"r-num:%d, " "r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s", "lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, pSyncNode->vgId, syncStr(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex, pSyncNode->commitIndex, logBeginIndex, logLastIndex, pSyncNode->minMatchIndex, snapshot.lastApplyIndex,
snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy, snapshot.lastApplyTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->pRaftCfg->snapshotStrategy,
pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->pRaftCfg->batchSize, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex,
...@@ -1875,9 +1830,9 @@ inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { ...@@ -1875,9 +1830,9 @@ inline char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
", sby:%d, " ", sby:%d, "
"r-num:%d, " "r-num:%d, "
"lcfg:%" PRId64 ", chging:%d, rsto:%d", "lcfg:%" PRId64 ", chging:%d, rsto:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->vgId, syncStr(pSyncNode->state), pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex,
pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish); pSyncNode->pRaftCfg->lastConfigIndex, pSyncNode->changing, pSyncNode->restoreFinish);
return s; return s;
} }
......
...@@ -2413,7 +2413,7 @@ cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg) { ...@@ -2413,7 +2413,7 @@ cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg) {
cJSON_AddNumberToObject(pRoot, "fsmMeta.isWeak", pMsg->fsmMeta.isWeak); cJSON_AddNumberToObject(pRoot, "fsmMeta.isWeak", pMsg->fsmMeta.isWeak);
cJSON_AddNumberToObject(pRoot, "fsmMeta.code", pMsg->fsmMeta.code); cJSON_AddNumberToObject(pRoot, "fsmMeta.code", pMsg->fsmMeta.code);
cJSON_AddNumberToObject(pRoot, "fsmMeta.state", pMsg->fsmMeta.state); cJSON_AddNumberToObject(pRoot, "fsmMeta.state", pMsg->fsmMeta.state);
cJSON_AddStringToObject(pRoot, "fsmMeta.state.str", syncUtilState2String(pMsg->fsmMeta.state)); cJSON_AddStringToObject(pRoot, "fsmMeta.state.str", syncStr(pMsg->fsmMeta.state));
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->fsmMeta.seqNum); snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->fsmMeta.seqNum);
cJSON_AddStringToObject(pRoot, "fsmMeta.seqNum", u64buf); cJSON_AddStringToObject(pRoot, "fsmMeta.seqNum", u64buf);
......
...@@ -176,18 +176,6 @@ char* syncUtilRaftId2Str(const SRaftId* p) { ...@@ -176,18 +176,6 @@ char* syncUtilRaftId2Str(const SRaftId* p) {
return serialized; return serialized;
} }
const char* syncUtilState2String(ESyncState state) {
if (state == TAOS_SYNC_STATE_FOLLOWER) {
return "follower";
} else if (state == TAOS_SYNC_STATE_CANDIDATE) {
return "candidate";
} else if (state == TAOS_SYNC_STATE_LEADER) {
return "leader";
} else {
return "state_error";
}
}
bool syncUtilCanPrint(char c) { bool syncUtilCanPrint(char c) {
if (c >= 32 && c <= 126) { if (c >= 32 && c <= 126) {
return true; return true;
......
...@@ -47,8 +47,8 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { ...@@ -47,8 +47,8 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64
", term:%" PRIu64 " \n", ", term:%" PRIu64 " \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag,
cbMeta.flag, cbMeta.term); cbMeta.term);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
} else { } else {
sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index); sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index);
...@@ -57,10 +57,10 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { ...@@ -57,10 +57,10 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char logBuf[256] = {0}; char logBuf[256] = {0};
snprintf( snprintf(logBuf, sizeof(logBuf),
logBuf, sizeof(logBuf), "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", "\n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
} }
...@@ -68,8 +68,7 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { ...@@ -68,8 +68,7 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag);
cbMeta.flag);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
} }
......
...@@ -45,8 +45,7 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { ...@@ -45,8 +45,7 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char logBuf[256] = {0}; char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag);
cbMeta.flag);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
} else { } else {
sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index); sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index);
...@@ -55,10 +54,10 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { ...@@ -55,10 +54,10 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char logBuf[256] = {0}; char logBuf[256] = {0};
snprintf( snprintf(logBuf, sizeof(logBuf),
logBuf, sizeof(logBuf), "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", "\n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
} }
...@@ -66,8 +65,7 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { ...@@ -66,8 +65,7 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n", "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s flag:%" PRIu64 "\n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag);
cbMeta.flag);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
} }
......
...@@ -42,7 +42,7 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { ...@@ -42,7 +42,7 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
} else { } else {
sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index); sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index);
...@@ -53,7 +53,7 @@ void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) ...@@ -53,7 +53,7 @@ void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta)
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
} }
...@@ -61,7 +61,7 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { ...@@ -61,7 +61,7 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
} }
......
...@@ -45,7 +45,7 @@ void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { ...@@ -45,7 +45,7 @@ void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
} else { } else {
sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index); sTrace("==callback== ==CommitCb== do not apply again %" PRId64, cbMeta.index);
...@@ -56,7 +56,7 @@ void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) ...@@ -56,7 +56,7 @@ void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta)
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
} }
...@@ -64,7 +64,7 @@ void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { ...@@ -64,7 +64,7 @@ void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
} }
......
...@@ -44,8 +44,8 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { ...@@ -44,8 +44,8 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
", term:%" PRIu64 ", term:%" PRIu64
" " " "
"currentTerm:%" PRIu64 " \n", "currentTerm:%" PRIu64 " \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag,
cbMeta.flag, cbMeta.term, cbMeta.currentTerm); cbMeta.term, cbMeta.currentTerm);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
} }
...@@ -56,8 +56,8 @@ void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) ...@@ -56,8 +56,8 @@ void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta)
", term:%" PRIu64 ", term:%" PRIu64
" " " "
"currentTerm:%" PRIu64 " \n", "currentTerm:%" PRIu64 " \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag,
cbMeta.flag, cbMeta.term, cbMeta.currentTerm); cbMeta.term, cbMeta.currentTerm);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
} }
...@@ -68,8 +68,8 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { ...@@ -68,8 +68,8 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
", term:%" PRIu64 ", term:%" PRIu64
" " " "
"currentTerm:%" PRIu64 " \n", "currentTerm:%" PRIu64 " \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag,
cbMeta.flag, cbMeta.term, cbMeta.currentTerm); cbMeta.term, cbMeta.currentTerm);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
} }
...@@ -168,8 +168,8 @@ void LeaderTransferCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbM ...@@ -168,8 +168,8 @@ void LeaderTransferCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbM
", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 ", term:%" PRIu64 ", isWeak:%d, code:%d, state:%d %s, flag:%" PRIu64 ", term:%" PRIu64
" " " "
"currentTerm:%" PRIu64 " \n", "currentTerm:%" PRIu64 " \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state), cbMeta.flag,
cbMeta.flag, cbMeta.term, cbMeta.currentTerm); cbMeta.term, cbMeta.currentTerm);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
} }
......
...@@ -35,7 +35,7 @@ void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { ...@@ -35,7 +35,7 @@ void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, "==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
} }
...@@ -43,7 +43,7 @@ void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) ...@@ -43,7 +43,7 @@ void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta)
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, "==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
} }
...@@ -51,7 +51,7 @@ void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { ...@@ -51,7 +51,7 @@ void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm, "==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册