提交 0ffc6c46 编写于 作者: M Minghao Li

refactor(sync): add snapshot2 interface

上级 91238e82
...@@ -121,7 +121,7 @@ typedef struct SSyncFSM { ...@@ -121,7 +121,7 @@ typedef struct SSyncFSM {
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader);
int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader); int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader);
int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader); int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader);
int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len); int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len);
......
...@@ -117,7 +117,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM ...@@ -117,7 +117,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
} }
} }
int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
mDebug("start to read snapshot from sdb"); mDebug("start to read snapshot from sdb");
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL); return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL);
......
...@@ -119,7 +119,7 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -119,7 +119,7 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
} }
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnode * pVnode = pInfo->ahandle; SVnode *pVnode = pInfo->ahandle;
int32_t vgId = pVnode->config.vgId; int32_t vgId = pVnode->config.vgId;
int32_t code = 0; int32_t code = 0;
SRpcMsg *pMsg = NULL; SRpcMsg *pMsg = NULL;
...@@ -199,7 +199,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -199,7 +199,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
} }
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnode * pVnode = pInfo->ahandle; SVnode *pVnode = pInfo->ahandle;
int32_t vgId = pVnode->config.vgId; int32_t vgId = pVnode->config.vgId;
int32_t code = 0; int32_t code = 0;
SRpcMsg *pMsg = NULL; SRpcMsg *pMsg = NULL;
...@@ -240,7 +240,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -240,7 +240,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
STraceId *trace = &pMsg->info.traceId; STraceId *trace = &pMsg->info.traceId;
do { do {
char * syncNodeStr = sync2SimpleStr(pVnode->sync); char *syncNodeStr = sync2SimpleStr(pVnode->sync);
static int64_t vndTick = 0; static int64_t vndTick = 0;
if (++vndTick % 10 == 1) { if (++vndTick % 10 == 1) {
vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr); vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
...@@ -375,7 +375,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon ...@@ -375,7 +375,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
} }
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SVnode * pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
SSnapshot snapshot = {0}; SSnapshot snapshot = {0};
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
char logBuf[256] = {0}; char logBuf[256] = {0};
...@@ -409,7 +409,7 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta ...@@ -409,7 +409,7 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
} }
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { return 0; } static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { return 0; }
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; } static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; }
......
...@@ -120,7 +120,7 @@ static void syncNodeStartSnapshot(SSyncNode* ths, SyncIndex beginIndex, SyncInde ...@@ -120,7 +120,7 @@ static void syncNodeStartSnapshot(SSyncNode* ths, SyncIndex beginIndex, SyncInde
void* pReader = NULL; void* pReader = NULL;
SReaderParam readerParam = {.start = beginIndex, .end = endIndex}; SReaderParam readerParam = {.start = beginIndex, .end = endIndex};
ths->pFsm->FpSnapshotStartRead(ths->pFsm, &pReader); ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader);
if (!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) { if (!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) {
ASSERT(pReader != NULL); ASSERT(pReader != NULL);
snapshotSenderStart(pSender, snapshot, pReader); snapshotSenderStart(pSender, snapshot, pReader);
......
...@@ -77,7 +77,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { ...@@ -77,7 +77,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
return 0; return 0;
} }
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) {
*ppReader = (void*)0xABCD; *ppReader = (void*)0xABCD;
char logBuf[256] = {0}; char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader); snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader);
......
...@@ -25,7 +25,7 @@ void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) ...@@ -25,7 +25,7 @@ void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta)
int32_t GetSnapshot(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } int32_t GetSnapshot(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; }
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { return 0; } int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) { return 0; }
int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; } int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; }
int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; } int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; }
......
...@@ -74,7 +74,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { ...@@ -74,7 +74,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
return 0; return 0;
} }
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) {
*ppReader = (void*)0xABCD; *ppReader = (void*)0xABCD;
char logBuf[256] = {0}; char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader); snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册