提交 757a1248 编写于 作者: M Minghao Li

fix: send snapshot

上级 ae06cc36
......@@ -157,6 +157,7 @@ typedef struct SSyncNode {
// SSnapshot* pSnapshot;
SSyncSnapshotSender* senders[TSDB_MAX_REPLICA];
SSyncSnapshotReceiver* receivers[TSDB_MAX_REPLICA];
SSyncSnapshotReceiver* pNewNodeReceiver;
} SSyncNode;
......
......@@ -319,6 +319,23 @@ static void *syncIOConsumerFunc(void *param) {
io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_SEND) {
if (io->FpOnSyncSnapshotSend != NULL) {
SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
io->FpOnSyncSnapshotSend(io->pSyncNode, pSyncMsg);
syncSnapshotSendDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_RSP) {
if (io->FpOnSyncSnapshotRsp != NULL) {
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL);
io->FpOnSyncSnapshotRsp(io->pSyncNode, pSyncMsg);
syncSnapshotRspDestroy(pSyncMsg);
}
} else {
sTrace("unknown msgType:%d, no operator", pRpcMsg->msgType);
}
......
......@@ -601,6 +601,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
(pSyncNode->receivers)[i] = pReceiver;
}
pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, 100);
// start in syncNodeStart
// start raft
// syncNodeBecomeFollower(pSyncNode);
......@@ -611,6 +613,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
void syncNodeStart(SSyncNode* pSyncNode) {
// start raft
if (pSyncNode->replicaNum == 1) {
raftStoreNextTerm(pSyncNode->pRaftStore);
syncNodeBecomeLeader(pSyncNode);
syncNodeLog2("==state change become leader immediately==", pSyncNode);
......@@ -706,6 +709,11 @@ void syncNodeClose(SSyncNode* pSyncNode) {
}
}
if (pSyncNode->pNewNodeReceiver != NULL) {
snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
pSyncNode->pNewNodeReceiver = NULL;
}
/*
if (pSyncNode->pSnapshot != NULL) {
taosMemoryFree(pSyncNode->pSnapshot);
......@@ -1294,7 +1302,7 @@ int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, Sy
// get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
ASSERT(index >= SYNC_INDEX_BEGIN);
ASSERT(!syncNodeIsIndexInSnapshot(pSyncNode, index));
// ASSERT(!syncNodeIsIndexInSnapshot(pSyncNode, index));
int ret = 0;
SyncIndex preIndex = index - 1;
......
......@@ -65,6 +65,16 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
pRoot = syncAppendEntriesReply2Json(pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_SEND) {
SyncSnapshotSend* pSyncMsg = syncSnapshotSendDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncSnapshotSend2Json(pSyncMsg);
syncSnapshotSendDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_RSP) {
SyncSnapshotRsp* pSyncMsg = syncSnapshotRspDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
pRoot = syncSnapshotRsp2Json(pSyncMsg);
syncSnapshotRspDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_COMMON_RESPONSE) {
pRoot = cJSON_CreateObject();
char* s;
......
......@@ -61,6 +61,7 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
// set prevLogIndex
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
SyncIndex preLogIndex = nextIndex - 1;
// set preLogTerm
......@@ -127,13 +128,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
SRaftId* pDestId = &(pSyncNode->peersId[i]);
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
SyncIndex preLogIndex;
SyncTerm preLogTerm;
ret = syncNodeGetPreIndexTerm(pSyncNode, nextIndex, &preLogIndex, &preLogTerm);
ASSERT(ret == 0);
// batch optimized
// SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex);
......@@ -181,6 +178,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
snapshotSenderStart(pSender);
} else {
ret = syncNodeGetPreIndexTerm(pSyncNode, nextIndex, &preLogIndex, &preLogTerm);
ASSERT(ret == 0);
SyncAppendEntries* pMsg = NULL;
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, nextIndex);
if (pEntry != NULL) {
......
......@@ -407,7 +407,11 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
pReceiver = (pSyncNode->receivers)[i];
}
}
ASSERT(pReceiver != NULL);
// add new replica
if (pReceiver == NULL) {
pReceiver = pSyncNode->pNewNodeReceiver;
}
bool needRsp = false;
......@@ -430,7 +434,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
snapshotReceiverStop(pReceiver);
pReceiver->ack = pMsg->seq;
needRsp = true;
needRsp = false;
char *msgStr = syncSnapshotSend2Str(pMsg);
sTrace("snapshot recv end ack:%d recv msg:%s", pReceiver->ack, msgStr);
......
......@@ -20,6 +20,7 @@ uint16_t gPorts[] = {7010, 7110, 7210, 7310, 7410};
const char* gDir = "./syncReplicateTest";
int32_t gVgId = 1234;
SyncIndex gSnapshotLastApplyIndex;
SyncIndex gSnapshotLastApplyTerm;
void init() {
int code = walInit();
......@@ -44,8 +45,9 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
if (cbMeta.index > beginIndex) {
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf),
"==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", pFsm,
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag);
"==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, flag:%lu, term:%lu \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
cbMeta.flag, cbMeta.term);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
} else {
sTrace("==callback== ==CommitCb== do not apply again %ld", cbMeta.index);
......@@ -71,7 +73,7 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
pSnapshot->data = NULL;
pSnapshot->lastApplyIndex = gSnapshotLastApplyIndex;
pSnapshot->lastApplyTerm = 100;
pSnapshot->lastApplyTerm = gSnapshotLastApplyTerm;
return 0;
}
......@@ -94,17 +96,18 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32
static int readIter = 0;
if (readIter == 5) {
*len = 0;
*ppBuf = NULL;
} else if (readIter < 5) {
*len = 20;
*ppBuf = taosMemoryMalloc(*len);
snprintf((char*)*ppBuf, *len, "data iter:%d", readIter);
} else {
*len = 0;
*ppBuf = NULL;
}
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotDoRead== pFsm:%p, pReader:%p, *len:%d *ppBuf:%s", pFsm,
pReader, *len, (char*)(*ppBuf));
snprintf(logBuf, sizeof(logBuf),
"==callback== ==SnapshotDoRead== pFsm:%p, pReader:%p, *len:%d, *ppBuf:%s, readIter:%d", pFsm, pReader, *len,
(char*)(*ppBuf), readIter);
sTrace("%s", logBuf);
readIter++;
......@@ -249,7 +252,7 @@ void configChange(int64_t rid, int32_t replicaNum, int32_t myIndex) {
}
void usage(char* exe) {
printf("usage: %s replicaNum myIndex lastApplyIndex writeRecordNum isStandBy isConfigChange \n", exe);
printf("usage: %s replicaNum myIndex lastApplyIndex writeRecordNum isStandBy isConfigChange lastApplyTerm \n", exe);
}
SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
......@@ -265,7 +268,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) {
int main(int argc, char** argv) {
tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE + DEBUG_INFO;
if (argc != 7) {
if (argc != 8) {
usage(argv[0]);
exit(-1);
}
......@@ -276,7 +279,15 @@ int main(int argc, char** argv) {
int32_t writeRecordNum = atoi(argv[4]);
bool isStandBy = atoi(argv[5]);
bool isConfigChange = atoi(argv[6]);
int32_t lastApplyTerm = atoi(argv[7]);
sTrace(
"args: replicaNum:%d, myIndex:%d, lastApplyIndex:%d, writeRecordNum:%d, isStandBy:%d, isConfigChange:%d, "
"lastApplyTerm:%d",
replicaNum, myIndex, lastApplyIndex, writeRecordNum, isStandBy, isConfigChange, lastApplyTerm);
gSnapshotLastApplyIndex = lastApplyIndex;
gSnapshotLastApplyTerm = lastApplyTerm;
if (!isStandBy) {
assert(replicaNum >= 1 && replicaNum <= 5);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册