提交 45357229 编写于 作者: M Minghao Li

enh(sync) sync/mnode integration, syncStart async -> sync

上级 97b1e95a
......@@ -87,6 +87,7 @@ typedef struct SSyncFSM {
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRestoreFinish)(struct SSyncFSM* pFsm);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
} SSyncFSM;
......@@ -119,7 +120,6 @@ typedef struct SSyncLogStore {
} SSyncLogStore;
typedef struct SSyncInfo {
SyncGroupId vgId;
SSyncCfg syncCfg;
......
......@@ -36,8 +36,8 @@ typedef struct SSyncIO {
STaosQueue *pMsgQ;
STaosQset * pQset;
TdThread consumerTid;
void *serverRpc;
void *clientRpc;
void * serverRpc;
void * clientRpc;
SEpSet myAddr;
SMsgCb msgcb;
......
......@@ -147,6 +147,11 @@ typedef struct SSyncNode {
// tools
SSyncRespMgr* pSyncRespMgr;
// restore state
bool restoreFinish;
sem_t restoreSem;
SSnapshot* pSnapshot;
} SSyncNode;
// open/close --------------
......
......@@ -324,7 +324,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
// if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
......@@ -335,6 +334,15 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
cbMeta.term = pEntry->term;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
bool needExecute = true;
if (ths->pSnapshot != NULL && cbMeta.index <= ths->pSnapshot->lastApplyIndex) {
needExecute = false;
}
if (needExecute) {
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
// config change
......@@ -351,6 +359,16 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
}
}
// restore finish
if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) {
if (ths->restoreFinish == false) {
ths->pFsm->FpRestoreFinish(ths->pFsm);
ths->restoreFinish = true;
tsem_post(&ths->restoreSem);
}
}
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
}
......
......@@ -102,7 +102,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
// if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta;
cbMeta.index = pEntry->index;
......@@ -110,9 +109,17 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
cbMeta.code = 0;
cbMeta.state = pSyncNode->state;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta);
cbMeta.term = pEntry->term;
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
bool needExecute = true;
if (pSyncNode->pSnapshot != NULL && cbMeta.index <= pSyncNode->pSnapshot->lastApplyIndex) {
needExecute = false;
}
if (needExecute) {
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta);
}
}
// config change
......@@ -129,6 +136,16 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
}
}
// restore finish
if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) {
if (pSyncNode->restoreFinish == false) {
pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm);
pSyncNode->restoreFinish = true;
tsem_post(&pSyncNode->restoreSem);
}
}
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
}
......
......@@ -242,7 +242,7 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
return ret;
}
void syncSetMsgCb(int64_t rid, const SMsgCb *msgcb) {
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid);
......@@ -492,6 +492,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0);
assert(pSyncNode->pSyncRespMgr != NULL);
// restore state
pSyncNode->restoreFinish = false;
pSyncNode->pSnapshot = NULL;
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot);
}
tsem_init(&(pSyncNode->restoreSem), 0, 0);
// start in syncNodeStart
// start raft
// syncNodeBecomeFollower(pSyncNode);
......@@ -511,6 +520,8 @@ void syncNodeStart(SSyncNode* pSyncNode) {
// use this now
syncNodeAppendNoop(pSyncNode);
syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica
tsem_wait(&pSyncNode->restoreSem);
return;
}
......@@ -520,6 +531,8 @@ void syncNodeStart(SSyncNode* pSyncNode) {
int32_t ret = 0;
// ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
tsem_wait(&pSyncNode->restoreSem);
}
void syncNodeStartStandBy(SSyncNode* pSyncNode) {
......@@ -556,6 +569,12 @@ void syncNodeClose(SSyncNode* pSyncNode) {
taosMemoryFree(pSyncNode->pFsm);
}
if (pSyncNode->pSnapshot != NULL) {
taosMemoryFree(pSyncNode->pSnapshot);
}
tsem_destroy(&pSyncNode->restoreSem);
// free memory in syncFreeNode
// taosMemoryFree(pSyncNode);
}
......
......@@ -160,6 +160,8 @@ SyncClientRequest *step1(const SRpcMsg *pMsg) {
}
int main(int argc, char **argv) {
sprintf(tsTempDir, "%s", ".");
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册