未验证 提交 7b1833a6 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #12733 from taosdata/fix/mnode

refactor: adjust vnode sync
...@@ -20,17 +20,23 @@ ...@@ -20,17 +20,23 @@
extern "C" { extern "C" {
#endif #endif
#include "os.h"
#include "cJSON.h" #include "cJSON.h"
#include "tdef.h" #include "tdef.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
typedef uint64_t SyncNodeId; typedef uint64_t SyncNodeId;
typedef int32_t SyncGroupId; typedef int32_t SyncGroupId;
typedef int64_t SyncIndex; typedef int64_t SyncIndex;
typedef uint64_t SyncTerm; typedef uint64_t SyncTerm;
typedef struct SSyncNode SSyncNode;
typedef struct SSyncBuffer SSyncBuffer;
typedef struct SWal SWal;
typedef struct SSyncRaftEntry SSyncRaftEntry;
typedef enum { typedef enum {
TAOS_SYNC_STATE_FOLLOWER = 100, TAOS_SYNC_STATE_FOLLOWER = 100,
TAOS_SYNC_STATE_CANDIDATE = 101, TAOS_SYNC_STATE_CANDIDATE = 101,
...@@ -38,6 +44,17 @@ typedef enum { ...@@ -38,6 +44,17 @@ typedef enum {
TAOS_SYNC_STATE_ERROR = 103, TAOS_SYNC_STATE_ERROR = 103,
} ESyncState; } ESyncState;
typedef enum {
TAOS_SYNC_PROPOSE_SUCCESS = 0,
TAOS_SYNC_PROPOSE_NOT_LEADER = 1,
TAOS_SYNC_PROPOSE_OTHER_ERROR = 2,
} ESyncProposeCode;
typedef enum {
TAOS_SYNC_FSM_CB_SUCCESS = 0,
TAOS_SYNC_FSM_CB_OTHER_ERROR = 1,
} ESyncFsmCbCode;
typedef struct SNodeInfo { typedef struct SNodeInfo {
uint16_t nodePort; uint16_t nodePort;
char nodeFqdn[TSDB_FQDN_LEN]; char nodeFqdn[TSDB_FQDN_LEN];
...@@ -55,11 +72,6 @@ typedef struct SSnapshot { ...@@ -55,11 +72,6 @@ typedef struct SSnapshot {
SyncTerm lastApplyTerm; SyncTerm lastApplyTerm;
} SSnapshot; } SSnapshot;
typedef enum {
TAOS_SYNC_FSM_CB_SUCCESS = 0,
TAOS_SYNC_FSM_CB_OTHER_ERROR,
} ESyncFsmCbCode;
typedef struct SFsmCbMeta { typedef struct SFsmCbMeta {
SyncIndex index; SyncIndex index;
bool isWeak; bool isWeak;
...@@ -68,27 +80,15 @@ typedef struct SFsmCbMeta { ...@@ -68,27 +80,15 @@ typedef struct SFsmCbMeta {
uint64_t seqNum; uint64_t seqNum;
} SFsmCbMeta; } SFsmCbMeta;
struct SRpcMsg;
typedef struct SRpcMsg SRpcMsg;
typedef struct SSyncFSM { typedef struct SSyncFSM {
void* data; void* data;
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot); int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
} SSyncFSM; } SSyncFSM;
struct SSyncRaftEntry;
typedef struct SSyncRaftEntry SSyncRaftEntry;
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
// abstract definition of log store in raft // abstract definition of log store in raft
// SWal implements it // SWal implements it
typedef struct SSyncLogStore { typedef struct SSyncLogStore {
...@@ -117,11 +117,6 @@ typedef struct SSyncLogStore { ...@@ -117,11 +117,6 @@ typedef struct SSyncLogStore {
} SSyncLogStore; } SSyncLogStore;
struct SWal;
typedef struct SWal SWal;
struct SEpSet;
typedef struct SEpSet SEpSet;
typedef struct SSyncInfo { typedef struct SSyncInfo {
SyncGroupId vgId; SyncGroupId vgId;
...@@ -130,10 +125,8 @@ typedef struct SSyncInfo { ...@@ -130,10 +125,8 @@ typedef struct SSyncInfo {
SWal* pWal; SWal* pWal;
SSyncFSM* pFsm; SSyncFSM* pFsm;
SMsgCb* msgcb; SMsgCb* msgcb;
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
} SSyncInfo; } SSyncInfo;
int32_t syncInit(); int32_t syncInit();
...@@ -148,27 +141,8 @@ const char* syncGetMyRoleStr(int64_t rid); ...@@ -148,27 +141,8 @@ const char* syncGetMyRoleStr(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid); SyncTerm syncGetMyTerm(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
int32_t syncGetVgId(int64_t rid); int32_t syncGetVgId(int64_t rid);
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
typedef enum { bool syncEnvIsStart();
TAOS_SYNC_PROPOSE_SUCCESS = 0,
TAOS_SYNC_PROPOSE_NOT_LEADER,
TAOS_SYNC_PROPOSE_OTHER_ERROR,
} ESyncProposeCode;
int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
bool syncEnvIsStart();
extern int32_t sDebugFlag;
//-----------------------------------------
struct SSyncNode;
typedef struct SSyncNode SSyncNode;
struct SSyncBuffer;
typedef struct SSyncBuffer SSyncBuffer;
//-----------------------------------------
const char* syncStr(ESyncState state); const char* syncStr(ESyncState state);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -20,9 +20,6 @@ ...@@ -20,9 +20,6 @@
extern "C" { extern "C" {
#endif #endif
#include "os.h"
#include "cJSON.h"
#include "trpc.h" #include "trpc.h"
// ------------------ ds ------------------- // ------------------ ds -------------------
...@@ -32,9 +29,6 @@ typedef struct SRaftId { ...@@ -32,9 +29,6 @@ typedef struct SRaftId {
} SRaftId; } SRaftId;
// ------------------ control ------------------- // ------------------ control -------------------
struct SSyncNode;
typedef struct SSyncNode SSyncNode;
SSyncNode* syncNodeAcquire(int64_t rid); SSyncNode* syncNodeAcquire(int64_t rid);
void syncNodeRelease(SSyncNode* pNode); void syncNodeRelease(SSyncNode* pNode);
......
...@@ -45,20 +45,20 @@ typedef struct SVnodeCfg SVnodeCfg; ...@@ -45,20 +45,20 @@ typedef struct SVnodeCfg SVnodeCfg;
extern const SVnodeCfg vnodeCfgDefault; extern const SVnodeCfg vnodeCfgDefault;
int vnodeInit(int nthreads); int32_t vnodeInit(int32_t nthreads);
void vnodeCleanup(); void vnodeCleanup();
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodeClose(SVnode *pVnode); void vnodeClose(SVnode *pVnode);
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version); int32_t vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version);
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp); int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int vnodeValidateTableHash(SVnode *pVnode, char *tableFName); int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
int32_t vnodeStart(SVnode *pVnode); int32_t vnodeStart(SVnode *pVnode);
void vnodeStop(SVnode *pVnode); void vnodeStop(SVnode *pVnode);
...@@ -74,8 +74,8 @@ typedef struct SMetaEntry SMetaEntry; ...@@ -74,8 +74,8 @@ typedef struct SMetaEntry SMetaEntry;
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
void metaReaderClear(SMetaReader *pReader); void metaReaderClear(SMetaReader *pReader);
int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid); int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
int metaReadNext(SMetaReader *pReader); int32_t metaReadNext(SMetaReader *pReader);
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid); const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid);
#if 1 // refact APIs below (TODO) #if 1 // refact APIs below (TODO)
...@@ -86,7 +86,7 @@ typedef struct SMTbCursor SMTbCursor; ...@@ -86,7 +86,7 @@ typedef struct SMTbCursor SMTbCursor;
SMTbCursor *metaOpenTbCursor(SMeta *pMeta); SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur); void metaCloseTbCursor(SMTbCursor *pTbCur);
int metaTbCursorNext(SMTbCursor *pTbCur); int32_t metaTbCursorNext(SMTbCursor *pTbCur);
#endif #endif
// tsdb // tsdb
...@@ -124,8 +124,8 @@ typedef struct STqReadHandle STqReadHandle; ...@@ -124,8 +124,8 @@ typedef struct STqReadHandle STqReadHandle;
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta); STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList); void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList);
int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle); bool tqNextDataBlock(STqReadHandle *pHandle);
bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids); bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids);
...@@ -207,15 +207,15 @@ struct SMetaReader { ...@@ -207,15 +207,15 @@ struct SMetaReader {
SDecoder coder; SDecoder coder;
SMetaEntry me; SMetaEntry me;
void *pBuf; void *pBuf;
int szBuf; int32_t szBuf;
}; };
struct SMTbCursor { struct SMTbCursor {
TBC *pDbc; TBC *pDbc;
void *pKey; void *pKey;
void *pVal; void *pVal;
int kLen; int32_t kLen;
int vLen; int32_t vLen;
SMetaReader mr; SMetaReader mr;
}; };
......
...@@ -24,7 +24,6 @@ ...@@ -24,7 +24,6 @@
extern "C" { extern "C" {
#endif #endif
// vnodeDebug ====================
// clang-format off // clang-format off
#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) #define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
...@@ -34,17 +33,17 @@ extern "C" { ...@@ -34,17 +33,17 @@ extern "C" {
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0) #define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on // clang-format on
// vnodeCfg ==================== // vnodeCfg.c
extern const SVnodeCfg vnodeCfgDefault; extern const SVnodeCfg vnodeCfgDefault;
int vnodeCheckCfg(const SVnodeCfg*); int32_t vnodeCheckCfg(const SVnodeCfg*);
int vnodeEncodeConfig(const void* pObj, SJson* pJson); int32_t vnodeEncodeConfig(const void* pObj, SJson* pJson);
int vnodeDecodeConfig(const SJson* pJson, void* pObj); int32_t vnodeDecodeConfig(const SJson* pJson, void* pObj);
// vnodeModule ==================== // vnodeModule.c
int vnodeScheduleTask(int (*execute)(void*), void* arg); int32_t vnodeScheduleTask(int32_t (*execute)(void*), void* arg);
// vnodeBufPool ==================== // vnodeBufPool.c
typedef struct SVBufPoolNode SVBufPoolNode; typedef struct SVBufPoolNode SVBufPoolNode;
struct SVBufPoolNode { struct SVBufPoolNode {
SVBufPoolNode* prev; SVBufPoolNode* prev;
...@@ -62,37 +61,29 @@ struct SVBufPool { ...@@ -62,37 +61,29 @@ struct SVBufPool {
SVBufPoolNode node; SVBufPoolNode node;
}; };
int vnodeOpenBufPool(SVnode* pVnode, int64_t size); int32_t vnodeOpenBufPool(SVnode* pVnode, int64_t size);
int vnodeCloseBufPool(SVnode* pVnode); int32_t vnodeCloseBufPool(SVnode* pVnode);
void vnodeBufPoolReset(SVBufPool* pPool); void vnodeBufPoolReset(SVBufPool* pPool);
// vnodeQuery ==================== // vnodeQuery.c
int vnodeQueryOpen(SVnode* pVnode); int32_t vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode); void vnodeQueryClose(SVnode* pVnode);
int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit ==================== // vnodeCommit.c
int vnodeBegin(SVnode* pVnode); int32_t vnodeBegin(SVnode* pVnode);
int vnodeShouldCommit(SVnode* pVnode); int32_t vnodeShouldCommit(SVnode* pVnode);
int vnodeCommit(SVnode* pVnode); int32_t vnodeCommit(SVnode* pVnode);
int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
int vnodeSyncCommit(SVnode* pVnode); int32_t vnodeSyncCommit(SVnode* pVnode);
int vnodeAsyncCommit(SVnode* pVnode); int32_t vnodeAsyncCommit(SVnode* pVnode);
// vnodeCommit ==================== // vnodeSync.c
int32_t vnodeSyncOpen(SVnode* pVnode, char* path); int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
int32_t vnodeSyncStart(SVnode* pVnode); void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode);
void vnodeSyncSetMsgCb(SVnode* pVnode);
int32_t vnodeSyncEqMsg(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t vnodeSyncSendMsg(const SEpSet* pEpSet, SRpcMsg* pMsg);
void vnodeSyncCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void vnodeSyncPreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void vnodeSyncRollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
SSyncFSM* syncVnodeMakeFsm();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -180,7 +180,6 @@ void vnodeClose(SVnode *pVnode) { ...@@ -180,7 +180,6 @@ void vnodeClose(SVnode *pVnode) {
// start the sync timer after the queue is ready // start the sync timer after the queue is ready
int32_t vnodeStart(SVnode *pVnode) { int32_t vnodeStart(SVnode *pVnode) {
vnodeSyncSetMsgCb(pVnode);
vnodeSyncStart(pVnode); vnodeSyncStart(pVnode);
return 0; return 0;
} }
......
...@@ -13,71 +13,62 @@ ...@@ -13,71 +13,62 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#include "vnd.h" #include "vnd.h"
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg);
SSyncInfo syncInfo; static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg);
syncInfo.vgId = pVnode->config.vgId; static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode);
SSyncCfg *pCfg = &(syncInfo.syncCfg); static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
pCfg->replicaNum = pVnode->config.syncCfg.replicaNum; static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
pCfg->myIndex = pVnode->config.syncCfg.myIndex; static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
memcpy(pCfg->nodeInfo, pVnode->config.syncCfg.nodeInfo, sizeof(pCfg->nodeInfo)); static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot);
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", path);
syncInfo.pWal = pVnode->pWal;
syncInfo.pFsm = syncVnodeMakeFsm(pVnode); int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
syncInfo.msgcb = NULL; SSyncInfo syncInfo = {
syncInfo.FpSendMsg = vnodeSyncSendMsg; .vgId = pVnode->config.vgId,
syncInfo.FpEqMsg = vnodeSyncEqMsg; .syncCfg = pVnode->config.syncCfg,
.pWal = pVnode->pWal,
.msgcb = NULL,
.FpSendMsg = vnodeSyncSendMsg,
.FpEqMsg = vnodeSyncEqMsg,
};
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", path, TD_DIRSEP);
syncInfo.pFsm = vnodeSyncMakeFsm(pVnode);
pVnode->sync = syncOpen(&syncInfo); pVnode->sync = syncOpen(&syncInfo);
assert(pVnode->sync > 0); if (pVnode->sync <= 0) {
vError("vgId:%d, failed to open sync since %s", pVnode->config.vgId, terrstr());
return -1;
}
// for test
setPingTimerMS(pVnode->sync, 3000); setPingTimerMS(pVnode->sync, 3000);
setElectTimerMS(pVnode->sync, 500); setElectTimerMS(pVnode->sync, 500);
setHeartbeatTimerMS(pVnode->sync, 100); setHeartbeatTimerMS(pVnode->sync, 100);
return 0; return 0;
} }
int32_t vnodeSyncStart(SVnode *pVnode) { void vnodeSyncStart(SVnode *pVnode) {
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
syncStart(pVnode->sync); syncStart(pVnode->sync);
return 0;
} }
void vnodeSyncClose(SVnode *pVnode) { void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
// stop by ref id
syncStop(pVnode->sync);
}
void vnodeSyncSetMsgCb(SVnode *pVnode) { syncSetMsgCb(pVnode->sync, &pVnode->msgCb); }
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); } int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
pMsg->info.noResp = 1;
return tmsgSendReq(pEpSet, pMsg);
}
int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
SVnode *pVnode = (SVnode *)(pFsm->data);
vnodeGetSnapshot(pVnode, pSnapshot);
/*
pSnapshot->data = NULL;
pSnapshot->lastApplyIndex = 0;
pSnapshot->lastApplyTerm = 0;
*/
int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
vnodeGetSnapshot(pFsm->data, pSnapshot);
return 0; return 0;
} }
void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
if (pFsm->FpGetSnapshot != NULL) { if (pFsm->FpGetSnapshot != NULL) {
SSnapshot snapshot; SSnapshot snapshot = {0};
pFsm->FpGetSnapshot(pFsm, &snapshot); pFsm->FpGetSnapshot(pFsm, &snapshot);
beginIndex = snapshot.lastApplyIndex; beginIndex = snapshot.lastApplyIndex;
} }
...@@ -128,7 +119,7 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb ...@@ -128,7 +119,7 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
} }
} }
void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
...@@ -136,19 +127,19 @@ void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta ...@@ -136,19 +127,19 @@ void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
} }
void vnodeSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256]; char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
} }
SSyncFSM *syncVnodeMakeFsm(SVnode *pVnode) { SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM)); SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pVnode; pFsm->data = pVnode;
pFsm->FpCommitCb = vnodeSyncCommitCb; pFsm->FpCommitCb = vnodeSyncCommitMsg;
pFsm->FpPreCommitCb = vnodeSyncPreCommitCb; pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackCb; pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshot = vnodeSyncGetSnapshotCb; pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
return pFsm; return pFsm;
} }
\ No newline at end of file
...@@ -20,135 +20,41 @@ ...@@ -20,135 +20,41 @@
extern "C" { extern "C" {
#endif #endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "cJSON.h"
#include "sync.h" #include "sync.h"
#include "syncTools.h" #include "syncTools.h"
#include "taosdef.h"
#include "tglobal.h"
#include "tlog.h" #include "tlog.h"
#include "ttimer.h" #include "ttimer.h"
#define sFatal(...) \ // clang-format off
{ \ #define sFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
if (sDebugFlag & DEBUG_FATAL) { \ #define sError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
taosPrintLog("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \ #define sWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
} \ #define sInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
} #define sDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }} while(0)
#define sError(...) \ #define sTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }} while(0)
{ \ #define sFatalLong(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLongString("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
if (sDebugFlag & DEBUG_ERROR) { \ #define sErrorLong(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLongString("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
taosPrintLog("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \ #define sWarnLong(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLongString("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
} \ #define sInfoLong(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLongString("SYN ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
} #define sDebugLong(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLongString("SYN ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); }} while(0)
#define sWarn(...) \ #define sTraceLong(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLongString("SYN ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); }} while(0)
{ \ // clang-format on
if (sDebugFlag & DEBUG_WARN) { \
taosPrintLog("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); \ typedef struct SyncTimeout SyncTimeout;
} \ typedef struct SyncClientRequest SyncClientRequest;
} typedef struct SyncPing SyncPing;
#define sInfo(...) \ typedef struct SyncPingReply SyncPingReply;
{ \ typedef struct SyncRequestVote SyncRequestVote;
if (sDebugFlag & DEBUG_INFO) { \ typedef struct SyncRequestVoteReply SyncRequestVoteReply;
taosPrintLog("SYN INFO ", DEBUG_INFO, 255, __VA_ARGS__); \ typedef struct SyncAppendEntries SyncAppendEntries;
} \
}
#define sDebug(...) \
{ \
if (sDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("SYN DEBUG ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); \
} \
}
#define sTrace(...) \
{ \
if (sDebugFlag & DEBUG_TRACE) { \
taosPrintLog("SYN TRACE ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); \
} \
}
#define sFatalLong(...) \
{ \
if (sDebugFlag & DEBUG_FATAL) { \
taosPrintLongString("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
} \
}
#define sErrorLong(...) \
{ \
if (sDebugFlag & DEBUG_ERROR) { \
taosPrintLongString("SYN ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
} \
}
#define sWarnLong(...) \
{ \
if (sDebugFlag & DEBUG_WARN) { \
taosPrintLongString("SYN WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
} \
}
#define sInfoLong(...) \
{ \
if (sDebugFlag & DEBUG_INFO) { \
taosPrintLongString("SYN INFO ", DEBUG_INFO, 255, __VA_ARGS__); \
} \
}
#define sDebugLong(...) \
{ \
if (sDebugFlag & DEBUG_DEBUG) { \
taosPrintLongString("SYN DEBUG ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__); \
} \
}
#define sTraceLong(...) \
{ \
if (sDebugFlag & DEBUG_TRACE) { \
taosPrintLongString("SYN TRACE ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__); \
} \
}
struct SyncTimeout;
typedef struct SyncTimeout SyncTimeout;
struct SyncClientRequest;
typedef struct SyncClientRequest SyncClientRequest;
struct SyncPing;
typedef struct SyncPing SyncPing;
struct SyncPingReply;
typedef struct SyncPingReply SyncPingReply;
struct SyncRequestVote;
typedef struct SyncRequestVote SyncRequestVote;
struct SyncRequestVoteReply;
typedef struct SyncRequestVoteReply SyncRequestVoteReply;
struct SyncAppendEntries;
typedef struct SyncAppendEntries SyncAppendEntries;
struct SyncAppendEntriesReply;
typedef struct SyncAppendEntriesReply SyncAppendEntriesReply; typedef struct SyncAppendEntriesReply SyncAppendEntriesReply;
typedef struct SSyncEnv SSyncEnv;
struct SSyncEnv; typedef struct SRaftStore SRaftStore;
typedef struct SSyncEnv SSyncEnv; typedef struct SVotesGranted SVotesGranted;
typedef struct SVotesRespond SVotesRespond;
struct SRaftStore; typedef struct SSyncIndexMgr SSyncIndexMgr;
typedef struct SRaftStore SRaftStore; typedef struct SRaftCfg SRaftCfg;
typedef struct SSyncRespMgr SSyncRespMgr;
struct SVotesGranted;
typedef struct SVotesGranted SVotesGranted;
struct SVotesRespond;
typedef struct SVotesRespond SVotesRespond;
struct SSyncIndexMgr;
typedef struct SSyncIndexMgr SSyncIndexMgr;
struct SRaftCfg;
typedef struct SRaftCfg SRaftCfg;
struct SSyncRespMgr;
typedef struct SSyncRespMgr SSyncRespMgr;
typedef struct SSyncNode { typedef struct SSyncNode {
// init by SSyncInfo // init by SSyncInfo
......
...@@ -674,10 +674,10 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp ...@@ -674,10 +674,10 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
SEpSet epSet; SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet); syncUtilraftId2EpSet(destRaftId, &epSet);
if (pSyncNode->FpSendMsg != NULL) { if (pSyncNode->FpSendMsg != NULL) {
pMsg->info.noResp = 1;
// htonl // htonl
syncUtilMsgHtoN(pMsg->pCont); syncUtilMsgHtoN(pMsg->pCont);
pMsg->info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, pMsg); pSyncNode->FpSendMsg(&epSet, pMsg);
} else { } else {
sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL"); sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL");
...@@ -689,10 +689,10 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S ...@@ -689,10 +689,10 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
SEpSet epSet; SEpSet epSet;
syncUtilnodeInfo2EpSet(nodeInfo, &epSet); syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
if (pSyncNode->FpSendMsg != NULL) { if (pSyncNode->FpSendMsg != NULL) {
pMsg->info.noResp = 1;
// htonl // htonl
syncUtilMsgHtoN(pMsg->pCont); syncUtilMsgHtoN(pMsg->pCont);
pMsg->info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, pMsg); pSyncNode->FpSendMsg(&epSet, pMsg);
} else { } else {
sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL"); sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL");
......
...@@ -97,11 +97,12 @@ int main(int argc, char** argv) { ...@@ -97,11 +97,12 @@ int main(int argc, char** argv) {
for (int i = 0; i < 10; ++i) { for (int i = 0; i < 10; ++i) {
SyncPingReply* pSyncMsg = SyncPingReply* pSyncMsg =
syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncIOSendMsgTest"); syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncIOSendMsgTest");
SRpcMsg rpcMsg; SRpcMsg rpcMsg = {0};
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
SEpSet epSet; SEpSet epSet;
syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet); syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet);
rpcMsg.info.noResp = 1;
pSyncNode->FpSendMsg(&epSet, &rpcMsg); pSyncNode->FpSendMsg(&epSet, &rpcMsg);
taosMsleep(1000); taosMsleep(1000);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册