提交 d35028ae 编写于 作者: S Shengliang Guan

enh: refact sync callback func

上级 f4dc7d07
...@@ -193,9 +193,13 @@ typedef struct SSyncInfo { ...@@ -193,9 +193,13 @@ typedef struct SSyncInfo {
SWal* pWal; SWal* pWal;
SSyncFSM* pFsm; SSyncFSM* pFsm;
SMsgCb* msgcb; SMsgCb* msgcb;
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t pingMs;
int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); int32_t electMs;
int32_t (*FpEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); int32_t heartbeatMs;
int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
} SSyncInfo; } SSyncInfo;
int32_t syncInit(); int32_t syncInit();
...@@ -228,6 +232,8 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm); ...@@ -228,6 +232,8 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
const char* syncUtilState2String(ESyncState state);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -28,20 +28,11 @@ typedef struct SRaftId { ...@@ -28,20 +28,11 @@ typedef struct SRaftId {
SyncGroupId vgId; SyncGroupId vgId;
} SRaftId; } SRaftId;
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb);
char* sync2SimpleStr(int64_t rid); char* sync2SimpleStr(int64_t rid);
// set timer ms
void setPingTimerMS(int64_t rid, int32_t pingTimerMS);
void setElectTimerMS(int64_t rid, int32_t electTimerMS);
void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS);
// for compatibility, the same as syncPropose // for compatibility, the same as syncPropose
int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak); int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak);
// utils
const char* syncUtilState2String(ESyncState state);
// ------------------ for debug ------------------- // ------------------ for debug -------------------
void syncRpcMsgPrint(SRpcMsg* pMsg); void syncRpcMsgPrint(SRpcMsg* pMsg);
void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg); void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg);
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
#include "sdb.h" #include "sdb.h"
#include "sync.h" #include "sync.h"
#include "syncTools.h"
#include "tcache.h" #include "tcache.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h" #include "tglobal.h"
......
...@@ -202,9 +202,12 @@ int32_t mndInitSync(SMnode *pMnode) { ...@@ -202,9 +202,12 @@ int32_t mndInitSync(SMnode *pMnode) {
.vgId = 1, .vgId = 1,
.pWal = pMnode->pWal, .pWal = pMnode->pWal,
.msgcb = NULL, .msgcb = NULL,
.FpSendMsg = mndSyncSendMsg, .syncSendMSg = mndSyncSendMsg,
.FpEqMsg = mndSyncEqMsg, .syncEqMsg = mndSyncEqMsg,
.FpEqCtrlMsg = NULL, .syncEqCtrlMsg = NULL,
.pingMs = 5000,
.electMs = 3000,
.heartbeatMs = 500,
}; };
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
...@@ -228,11 +231,6 @@ int32_t mndInitSync(SMnode *pMnode) { ...@@ -228,11 +231,6 @@ int32_t mndInitSync(SMnode *pMnode) {
return -1; return -1;
} }
// decrease election timer
setPingTimerMS(pMgmt->sync, 5000);
setElectTimerMS(pMgmt->sync, 3000);
setHeartbeatTimerMS(pMgmt->sync, 500);
mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync); mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
return 0; return 0;
} }
...@@ -303,7 +301,6 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { ...@@ -303,7 +301,6 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
void mndSyncStart(SMnode *pMnode) { void mndSyncStart(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
syncStart(pMgmt->sync); syncStart(pMgmt->sync);
mInfo("vgId:1, sync started, id:%" PRId64, pMgmt->sync); mInfo("vgId:1, sync started, id:%" PRId64, pMgmt->sync);
} }
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
#define _TD_VND_H_ #define _TD_VND_H_
#include "sync.h" #include "sync.h"
#include "syncTools.h"
#include "ttrace.h" #include "ttrace.h"
#include "vnodeInt.h" #include "vnodeInt.h"
......
...@@ -503,9 +503,12 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { ...@@ -503,9 +503,12 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
.syncCfg = pVnode->config.syncCfg, .syncCfg = pVnode->config.syncCfg,
.pWal = pVnode->pWal, .pWal = pVnode->pWal,
.msgcb = NULL, .msgcb = NULL,
.FpSendMsg = vnodeSyncSendMsg, .syncSendMSg = vnodeSyncSendMsg,
.FpEqMsg = vnodeSyncEqMsg, .syncEqMsg = vnodeSyncEqMsg,
.FpEqCtrlMsg = vnodeSyncEqCtrlMsg, .syncEqCtrlMsg = vnodeSyncEqCtrlMsg,
.pingMs = 5000,
.electMs = 4000,
.heartbeatMs = 700,
}; };
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
...@@ -524,15 +527,11 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { ...@@ -524,15 +527,11 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
return -1; return -1;
} }
setPingTimerMS(pVnode->sync, 5000);
setElectTimerMS(pVnode->sync, 4000);
setHeartbeatTimerMS(pVnode->sync, 700);
return 0; return 0;
} }
void vnodeSyncStart(SVnode *pVnode) { void vnodeSyncStart(SVnode *pVnode) {
vDebug("vgId:%d, start sync", pVnode->config.vgId); vDebug("vgId:%d, start sync", pVnode->config.vgId);
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
syncStart(pVnode->sync); syncStart(pVnode->sync);
} }
......
...@@ -107,9 +107,9 @@ typedef struct SSyncNode { ...@@ -107,9 +107,9 @@ typedef struct SSyncNode {
// sync io // sync io
SWal* pWal; SWal* pWal;
const SMsgCb* msgcb; const SMsgCb* msgcb;
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t (*FpEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); int32_t (*syncEqCtrlMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
// init internal // init internal
SNodeInfo myNodeInfo; SNodeInfo myNodeInfo;
......
...@@ -56,6 +56,12 @@ int64_t syncOpen(SSyncInfo* pSyncInfo) { ...@@ -56,6 +56,12 @@ int64_t syncOpen(SSyncInfo* pSyncInfo) {
return -1; return -1;
} }
pSyncNode->pingBaseLine = pSyncInfo->pingMs;
pSyncNode->pingTimerMS = pSyncInfo->pingMs;
pSyncNode->electBaseLine = pSyncInfo->electMs;
pSyncNode->hbBaseLine = pSyncInfo->heartbeatMs;
pSyncNode->heartbeatTimerMS = pSyncInfo->heartbeatMs;
pSyncNode->msgcb = pSyncInfo->msgcb;
return pSyncNode->rid; return pSyncNode->rid;
} }
...@@ -747,18 +753,6 @@ static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandl ...@@ -747,18 +753,6 @@ static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandl
sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle); sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle);
} }
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
sTrace("syncSetQ get pSyncNode is NULL, rid:%" PRId64, rid);
return;
}
ASSERT(rid == pSyncNode->rid);
pSyncNode->msgcb = msgcb;
syncNodeRelease(pSyncNode);
}
char* sync2SimpleStr(int64_t rid) { char* sync2SimpleStr(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
...@@ -772,41 +766,6 @@ char* sync2SimpleStr(int64_t rid) { ...@@ -772,41 +766,6 @@ char* sync2SimpleStr(int64_t rid) {
return s; return s;
} }
void setPingTimerMS(int64_t rid, int32_t pingTimerMS) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return;
}
ASSERT(rid == pSyncNode->rid);
pSyncNode->pingBaseLine = pingTimerMS;
pSyncNode->pingTimerMS = pingTimerMS;
syncNodeRelease(pSyncNode);
}
void setElectTimerMS(int64_t rid, int32_t electTimerMS) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return;
}
ASSERT(rid == pSyncNode->rid);
pSyncNode->electBaseLine = electTimerMS;
syncNodeRelease(pSyncNode);
}
void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return;
}
ASSERT(rid == pSyncNode->rid);
pSyncNode->hbBaseLine = hbTimerMS;
pSyncNode->heartbeatTimerMS = hbTimerMS;
syncNodeRelease(pSyncNode);
}
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
SSyncNode* pSyncNode = syncNodeAcquire(rid); SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
...@@ -903,7 +862,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { ...@@ -903,7 +862,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
} }
} else { } else {
if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { if (pSyncNode->syncEqMsg != NULL && (*pSyncNode->syncEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
ret = 0; ret = 0;
} else { } else {
ret = -1; ret = -1;
...@@ -1034,9 +993,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { ...@@ -1034,9 +993,9 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode->pWal = pSyncInfo->pWal; pSyncNode->pWal = pSyncInfo->pWal;
pSyncNode->msgcb = pSyncInfo->msgcb; pSyncNode->msgcb = pSyncInfo->msgcb;
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg; pSyncNode->syncEqMsg = pSyncInfo->syncEqMsg;
pSyncNode->FpEqCtrlMsg = pSyncInfo->FpEqCtrlMsg; pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg;
// init raft config // init raft config
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
...@@ -1552,12 +1511,12 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) { ...@@ -1552,12 +1511,12 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet; SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet); syncUtilraftId2EpSet(destRaftId, &epSet);
if (pSyncNode->FpSendMsg != NULL) { if (pSyncNode->syncSendMSg != NULL) {
// htonl // htonl
syncUtilMsgHtoN(pMsg->pCont); syncUtilMsgHtoN(pMsg->pCont);
pMsg->info.noResp = 1; pMsg->info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, pMsg); pSyncNode->syncSendMSg(&epSet, pMsg);
} else { } else {
sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId); sError("vgId:%d, sync send msg by id error, fp-send-msg is null", pSyncNode->vgId);
return -1; return -1;
...@@ -1569,12 +1528,12 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp ...@@ -1569,12 +1528,12 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) { int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet; SEpSet epSet;
syncUtilnodeInfo2EpSet(nodeInfo, &epSet); syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
if (pSyncNode->FpSendMsg != NULL) { if (pSyncNode->syncSendMSg != NULL) {
// htonl // htonl
syncUtilMsgHtoN(pMsg->pCont); syncUtilMsgHtoN(pMsg->pCont);
pMsg->info.noResp = 1; pMsg->info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, pMsg); pSyncNode->syncSendMSg(&epSet, pMsg);
} else { } else {
sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId); sError("vgId:%d, sync send msg by info error, fp-send-msg is null", pSyncNode->vgId);
} }
...@@ -1598,13 +1557,13 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { ...@@ -1598,13 +1557,13 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
cJSON_AddStringToObject(pRoot, "rpcClient", u64buf); cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->syncSendMSg);
cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf); cJSON_AddStringToObject(pRoot, "syncSendMSg", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
cJSON_AddStringToObject(pRoot, "queue", u64buf); cJSON_AddStringToObject(pRoot, "queue", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->syncEqMsg);
cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf); cJSON_AddStringToObject(pRoot, "syncEqMsg", u64buf);
// init internal // init internal
cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo); cJSON* pMe = syncUtilNodeInfo2Json(&pSyncNode->myNodeInfo);
...@@ -2617,8 +2576,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { ...@@ -2617,8 +2576,8 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
if (pSyncNode->FpEqMsg != NULL) { if (pSyncNode->syncEqMsg != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("vgId:%d, sync enqueue ping msg error, code:%d", pSyncNode->vgId, code); sError("vgId:%d, sync enqueue ping msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
...@@ -2626,7 +2585,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { ...@@ -2626,7 +2585,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
return; return;
} }
} else { } else {
sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL"); sTrace("syncNodeEqPingTimer pSyncNode->syncEqMsg is NULL");
} }
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
...@@ -2651,8 +2610,8 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { ...@@ -2651,8 +2610,8 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
pSyncNode->vgId, pSyncNode); pSyncNode->vgId, pSyncNode);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
if (pSyncNode->FpEqMsg != NULL) { if (pSyncNode->syncEqMsg != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
...@@ -2668,7 +2627,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { ...@@ -2668,7 +2627,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
} while (0); } while (0);
} else { } else {
sTrace("syncNodeEqElectTimer FpEqMsg is NULL"); sTrace("syncNodeEqElectTimer syncEqMsg is NULL");
} }
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
...@@ -2700,8 +2659,8 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { ...@@ -2700,8 +2659,8 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
if (pSyncNode->FpEqMsg != NULL) { if (pSyncNode->syncEqMsg != NULL) {
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); int32_t code = pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("vgId:%d, sync enqueue timer msg error, code:%d", pSyncNode->vgId, code); sError("vgId:%d, sync enqueue timer msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
...@@ -2709,7 +2668,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { ...@@ -2709,7 +2668,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
return; return;
} }
} else { } else {
sError("vgId:%d, enqueue msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId); sError("vgId:%d, enqueue msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId);
} }
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
...@@ -2756,8 +2715,8 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { ...@@ -2756,8 +2715,8 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
// eq msg // eq msg
#if 0 #if 0
if (pSyncNode->FpEqCtrlMsg != NULL) { if (pSyncNode->syncEqCtrlMsg != NULL) {
int32_t code = pSyncNode->FpEqCtrlMsg(pSyncNode->msgcb, &rpcMsg); int32_t code = pSyncNode->syncEqCtrlMsg(pSyncNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code); sError("vgId:%d, sync ctrl enqueue timer msg error, code:%d", pSyncNode->vgId, code);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
...@@ -2765,7 +2724,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { ...@@ -2765,7 +2724,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) {
return; return;
} }
} else { } else {
sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. FpEqMsg) not set.", pSyncNode->vgId); sError("vgId:%d, enqueue ctrl msg cb ptr (i.e. syncEqMsg) not set.", pSyncNode->vgId);
} }
#endif #endif
...@@ -2805,10 +2764,10 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) { ...@@ -2805,10 +2764,10 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) {
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
if (ths->FpEqMsg != NULL) { if (ths->syncEqMsg != NULL) {
ths->FpEqMsg(ths->msgcb, &rpcMsg); ths->syncEqMsg(ths->msgcb, &rpcMsg);
} else { } else {
sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL"); sTrace("syncNodeEqNoop pSyncNode->syncEqMsg is NULL");
} }
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
...@@ -2919,8 +2878,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { ...@@ -2919,8 +2878,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) {
SRpcMsg rpcMsgLocalCmd; SRpcMsg rpcMsgLocalCmd;
syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd); syncLocalCmd2RpcMsg(pSyncMsg, &rpcMsgLocalCmd);
if (ths->FpEqMsg != NULL && ths->msgcb != NULL) { if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->FpEqMsg(ths->msgcb, &rpcMsgLocalCmd); int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
if (code != 0) { if (code != 0) {
sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code); sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
rpcFreeCont(rpcMsgLocalCmd.pCont); rpcFreeCont(rpcMsgLocalCmd.pCont);
......
...@@ -145,16 +145,17 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { ...@@ -145,16 +145,17 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
taosArrayPush(delIndexArray, pSeqNum); taosArrayPush(delIndexArray, pSeqNum);
cnt++; cnt++;
SFsmCbMeta cbMeta = {0}; SFsmCbMeta cbMeta = {
cbMeta.index = SYNC_INDEX_INVALID; cbMeta.index = SYNC_INDEX_INVALID,
cbMeta.lastConfigIndex = SYNC_INDEX_INVALID; cbMeta.lastConfigIndex = SYNC_INDEX_INVALID,
cbMeta.isWeak = false; cbMeta.isWeak = false,
cbMeta.code = TSDB_CODE_SYN_TIMEOUT; cbMeta.code = TSDB_CODE_SYN_TIMEOUT,
cbMeta.state = pSyncNode->state; cbMeta.state = pSyncNode->state,
cbMeta.seqNum = *pSeqNum; cbMeta.seqNum = *pSeqNum,
cbMeta.term = SYNC_TERM_INVALID; cbMeta.term = SYNC_TERM_INVALID,
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm,
cbMeta.flag = 0; cbMeta.flag = 0,
};
pStub->rpcMsg.pCont = NULL; pStub->rpcMsg.pCont = NULL;
pStub->rpcMsg.contLen = 0; pStub->rpcMsg.contLen = 0;
......
...@@ -177,18 +177,6 @@ char* syncUtilRaftId2Str(const SRaftId* p) { ...@@ -177,18 +177,6 @@ char* syncUtilRaftId2Str(const SRaftId* p) {
} }
const char* syncUtilState2String(ESyncState state) { const char* syncUtilState2String(ESyncState state) {
/*
if (state == TAOS_SYNC_STATE_FOLLOWER) {
return "TAOS_SYNC_STATE_FOLLOWER";
} else if (state == TAOS_SYNC_STATE_CANDIDATE) {
return "TAOS_SYNC_STATE_CANDIDATE";
} else if (state == TAOS_SYNC_STATE_LEADER) {
return "TAOS_SYNC_STATE_LEADER";
} else {
return "TAOS_SYNC_STATE_UNKNOWN";
}
*/
if (state == TAOS_SYNC_STATE_FOLLOWER) { if (state == TAOS_SYNC_STATE_FOLLOWER) {
return "follower"; return "follower";
} else if (state == TAOS_SYNC_STATE_CANDIDATE) { } else if (state == TAOS_SYNC_STATE_CANDIDATE) {
......
...@@ -195,8 +195,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* ...@@ -195,8 +195,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = vgId; syncInfo.vgId = vgId;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = createFsm(); syncInfo.pFsm = createFsm();
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal; syncInfo.pWal = pWal;
......
...@@ -120,8 +120,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* ...@@ -120,8 +120,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = vgId; syncInfo.vgId = vgId;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = createFsm(); syncInfo.pFsm = createFsm();
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal; syncInfo.pWal = pWal;
......
...@@ -45,8 +45,8 @@ SSyncNode* createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWa ...@@ -45,8 +45,8 @@ SSyncNode* createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWa
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = vgId; syncInfo.vgId = vgId;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = NULL; syncInfo.pFsm = NULL;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal; syncInfo.pWal = pWal;
......
...@@ -32,8 +32,8 @@ SSyncNode *pSyncNode; ...@@ -32,8 +32,8 @@ SSyncNode *pSyncNode;
SSyncNode *syncNodeInit() { SSyncNode *syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
...@@ -25,8 +25,8 @@ SSyncFSM* pFsm; ...@@ -25,8 +25,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
...@@ -97,7 +97,7 @@ int main(int argc, char** argv) { ...@@ -97,7 +97,7 @@ int main(int argc, char** argv) {
SyncPingReply* pSyncMsg = syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncEnqTest"); SyncPingReply* pSyncMsg = syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncEnqTest");
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); pSyncNode->syncEqMsg(pSyncNode->msgcb, &rpcMsg);
taosMsleep(1000); taosMsleep(1000);
} }
......
...@@ -26,8 +26,8 @@ SSyncFSM* pFsm; ...@@ -26,8 +26,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
...@@ -103,7 +103,7 @@ int main(int argc, char** argv) { ...@@ -103,7 +103,7 @@ int main(int argc, char** argv) {
SEpSet epSet; SEpSet epSet;
syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet); syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet);
rpcMsg.info.noResp = 1; rpcMsg.info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, &rpcMsg); pSyncNode->syncSendMSg(&epSet, &rpcMsg);
taosMsleep(1000); taosMsleep(1000);
} }
......
...@@ -26,8 +26,8 @@ SSyncFSM* pFsm; ...@@ -26,8 +26,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./sync_init_test"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./sync_init_test");
......
...@@ -26,8 +26,8 @@ SSyncFSM* pFsm; ...@@ -26,8 +26,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
...@@ -26,8 +26,8 @@ SSyncFSM* pFsm; ...@@ -26,8 +26,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
...@@ -26,8 +26,8 @@ SSyncFSM* pFsm; ...@@ -26,8 +26,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
...@@ -100,8 +100,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* ...@@ -100,8 +100,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = vgId; syncInfo.vgId = vgId;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = createFsm(); syncInfo.pFsm = createFsm();
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal; syncInfo.pWal = pWal;
......
...@@ -87,8 +87,8 @@ void initFsm() { ...@@ -87,8 +87,8 @@ void initFsm() {
SSyncNode *syncNodeInit() { SSyncNode *syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir);
...@@ -204,7 +204,7 @@ int main(int argc, char **argv) { ...@@ -204,7 +204,7 @@ int main(int argc, char **argv) {
SyncClientRequest *pSyncClientRequest = pMsg1; SyncClientRequest *pSyncClientRequest = pMsg1;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg); gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg);
taosMsleep(1000); taosMsleep(1000);
} }
......
...@@ -217,8 +217,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* ...@@ -217,8 +217,8 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = vgId; syncInfo.vgId = vgId;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = createFsm(); syncInfo.pFsm = createFsm();
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal; syncInfo.pWal = pWal;
......
...@@ -28,8 +28,8 @@ SSyncNode* pSyncNode; ...@@ -28,8 +28,8 @@ SSyncNode* pSyncNode;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
...@@ -28,8 +28,8 @@ SSyncNode* pSyncNode; ...@@ -28,8 +28,8 @@ SSyncNode* pSyncNode;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
...@@ -65,8 +65,8 @@ void initFsm() { ...@@ -65,8 +65,8 @@ void initFsm() {
SSyncNode *syncNodeInit() { SSyncNode *syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.msgcb = &gSyncIO->msgcb; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.syncEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir);
...@@ -179,7 +179,7 @@ int main(int argc, char **argv) { ...@@ -179,7 +179,7 @@ int main(int argc, char **argv) {
SyncClientRequest *pSyncClientRequest = pMsg1; SyncClientRequest *pSyncClientRequest = pMsg1;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg); gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg);
taosMsleep(1000); taosMsleep(1000);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册