未验证 提交 93c9b2a4 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19050 from taosdata/fix/TD-21176

refact: update sync log
......@@ -227,7 +227,7 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshotReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshotRsp(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg);
......
......@@ -86,7 +86,7 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceive
// on message
int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg);
int32_t syncNodeOnSnapshotReply(SSyncNode *ths, const SRpcMsg *pMsg);
int32_t syncNodeOnSnapshotRsp(SSyncNode *ths, const SRpcMsg *pMsg);
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode *pSyncNode, SyncIndex snapshotLastApplyIndex);
......
......@@ -100,12 +100,6 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s);
void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, int64_t timeDiff, const char* s);
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s);
void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s);
void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s);
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s);
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s);
......
......@@ -194,7 +194,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
code = syncNodeOnSnapshot(pSyncNode, pMsg);
break;
case TDMT_SYNC_SNAPSHOT_RSP:
code = syncNodeOnSnapshotReply(pSyncNode, pMsg);
code = syncNodeOnSnapshotRsp(pSyncNode, pMsg);
break;
case TDMT_SYNC_LOCAL_CMD:
code = syncNodeOnLocalCmd(pSyncNode, pMsg);
......@@ -705,7 +705,7 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
sNError(pSyncNode, "sync propose not leader, %s, type:%s", syncStr(pSyncNode->state), TMSG_INFO(pMsg->msgType));
sNError(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType));
return -1;
}
......@@ -890,10 +890,10 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// init by SSyncInfo
pSyncNode->vgId = pSyncInfo->vgId;
SSyncCfg* pCfg = &pSyncInfo->syncCfg;
sDebug("vgId:%d, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
sInfo("vgId:%d, start to open sync node, replica:%d selfIndex:%d", pSyncNode->vgId, pCfg->replicaNum, pCfg->myIndex);
for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
SNodeInfo* pNode = &pCfg->nodeInfo[i];
sDebug("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
sInfo("vgId:%d, index:%d ep:%s:%u", pSyncNode->vgId, i, pNode->nodeFqdn, pNode->nodePort);
}
memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
......@@ -1086,7 +1086,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
// ASSERT(pSender != NULL);
(pSyncNode->senders)[i] = pSender;
sSTrace(pSender, "snapshot sender create new while open, data:%p", pSender);
sSDebug(pSender, "snapshot sender create new while open, data:%p", pSender);
}
// snapshot receivers
......
......@@ -36,21 +36,21 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
taosThreadMutexInit(&(pObj->mutex), NULL);
SSyncNode *pNode = pObj->data;
sTrace("vgId:%d, create resp manager", pNode->vgId);
sDebug("vgId:%d, resp manager create", pNode->vgId);
return pObj;
}
void syncRespMgrDestroy(SSyncRespMgr *pObj) {
if (pObj != NULL) {
if (pObj == NULL) return;
SSyncNode *pNode = pObj->data;
sTrace("vgId:%d, destroy resp manager", pNode->vgId);
sDebug("vgId:%d, resp manager destroy", pNode->vgId);
taosThreadMutexLock(&pObj->mutex);
taosHashCleanup(pObj->pRespHash);
taosThreadMutexUnlock(&pObj->mutex);
taosThreadMutexDestroy(&(pObj->mutex));
taosMemoryFree(pObj);
}
}
uint64_t syncRespMgrAdd(SSyncRespMgr *pObj, const SRespStub *pStub) {
......@@ -174,7 +174,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
void syncRespCleanRsp(SSyncRespMgr *pObj) {
SSyncNode *pNode = pObj->data;
sTrace("vgId:%d, clean all rsp", pNode->vgId);
sTrace("vgId:%d, clean all resp", pNode->vgId);
taosThreadMutexLock(&pObj->mutex);
syncRespCleanByTTL(pObj, -1, true);
......@@ -183,7 +183,7 @@ void syncRespCleanRsp(SSyncRespMgr *pObj) {
void syncRespClean(SSyncRespMgr *pObj) {
SSyncNode *pNode = pObj->data;
sTrace("vgId:%d, clean rsp by ttl", pNode->vgId);
sTrace("vgId:%d, clean resp by ttl", pNode->vgId);
taosThreadMutexLock(&pObj->mutex);
syncRespCleanByTTL(pObj, pObj->ttl, false);
......
......@@ -26,10 +26,9 @@
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
(pSyncNode->pFsm->FpSnapshotDoRead != NULL);
if (!condition) return NULL;
SSyncSnapshotSender *pSender = NULL;
if (condition) {
pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
if (pSender == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
......@@ -49,15 +48,15 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender->endTime = 0;
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot);
pSender->finish = false;
} else {
sError("vgId:%d, cannot create snapshot sender", pSyncNode->vgId);
}
sDebug("vgId:%d, snapshot sender create", pSender->pSyncNode->vgId);
return pSender;
}
void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
if (pSender != NULL) {
if (pSender == NULL) return;
sDebug("vgId:%d, snapshot sender destroy", pSender->pSyncNode->vgId);
// free current block
if (pSender->pCurrentBlock != NULL) {
taosMemoryFree(pSender->pCurrentBlock);
......@@ -72,13 +71,16 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
// free sender
taosMemoryFree(pSender);
}
}
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
ASSERT(!snapshotSenderIsStart(pSender));
if (snapshotSenderIsStart(pSender)) {
sSError(pSender, "vgId:%d, snapshot sender is already start");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
pSender->start = true;
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
......@@ -86,10 +88,8 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pSender->pReader = NULL;
pSender->pCurrentBlock = NULL;
pSender->blockLen = 0;
pSender->snapshotParam.start = SYNC_INDEX_INVALID;
pSender->snapshotParam.end = SYNC_INDEX_INVALID;
pSender->snapshot.data = NULL;
pSender->snapshotParam.end = SYNC_INDEX_INVALID;
pSender->snapshot.lastApplyIndex = SYNC_INDEX_INVALID;
......@@ -105,7 +105,10 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
// build begin msg
SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId);
if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
return -1;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
......@@ -120,15 +123,20 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pMsg->seq = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT;
// send msg
syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
return -1;
}
// event log
sSTrace(pSender, "snapshot sender start");
sSDebug(pSender, "snapshot sender start");
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender start");
return 0;
}
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
sSDebug(pSender, "snapshot sender stop, finish:%d reader:%p", finish, pSender->pReader);
// update flag
pSender->start = false;
pSender->finish = finish;
......@@ -147,8 +155,6 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
pSender->blockLen = 0;
}
// event log
sSTrace(pSender, "snapshot sender stop");
return 0;
}
......@@ -164,18 +170,27 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
// read data
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader,
&(pSender->pCurrentBlock), &(pSender->blockLen));
ASSERT(ret == 0);
&pSender->pCurrentBlock, &pSender->blockLen);
if (ret != 0) {
sSError(pSender, "snapshot sender read failed since %s", terrstr());
return -1;
}
if (pSender->blockLen > 0) {
sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
// has read data
} else {
// read finish, update seq to end
pSender->seq = SYNC_SNAPSHOT_SEQ_END;
sSInfo(pSender, "snapshot sender read to the end, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
}
// build msg
SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId);
if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
return -1;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
......@@ -187,7 +202,6 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
pMsg->lastConfig = pSender->lastConfig;
pMsg->seq = pSender->seq;
// pMsg->privateTerm = pSender->privateTerm;
if (pSender->pCurrentBlock != NULL) {
......@@ -195,27 +209,32 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
}
// send msg
syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender send msg failed since %s", terrstr());
return -1;
}
pSender->lastSendTime = taosGetTimestampMs();
// event log
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
sSTrace(pSender, "snapshot sender finish");
sSDebug(pSender, "snapshot sender finish, seq:%d", pSender->seq);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender finish");
} else {
sSTrace(pSender, "snapshot sender sending");
sSDebug(pSender, "snapshot sender sending, seq:%d", pSender->seq);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender sending");
}
return 0;
}
// send snapshot data from cache
int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
// send current block data
// build msg
SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId);
if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "snapshot sender build msg failed since %s", terrstr());
return -1;
}
SyncSnapshotSend *pMsg = rpcMsg.pCont;
pMsg->srcId = pSender->pSyncNode->myRaftId;
......@@ -234,56 +253,63 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
}
// send msg
syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "");
if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "snapshot sender resend msg failed since %s", terrstr());
return -1;
}
pSender->lastSendTime = taosGetTimestampMs();
// event log
sSTrace(pSender, "snapshot sender resend");
sSDebug(pSender, "snapshot sender resend, seq:%d", pSender->seq);
syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, "snapshot sender resend");
return 0;
}
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
ASSERT(pMsg->ack == pSender->seq);
static int32_t snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) {
if (pMsg->ack != pSender->seq) {
sSError(pSender, "snapshot sender update seq failed, ack:%d seq:%d", pMsg->ack, pSender->seq);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
pSender->ack = pMsg->ack;
++(pSender->seq);
pSender->seq++;
sSDebug(pSender, "snapshot sender update seq:%d", pSender->seq);
return 0;
}
// return 0, start ok
// return 1, last snapshot finish ok
// return -1, error
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
sNTrace(pSyncNode, "starting snapshot ...");
sNInfo(pSyncNode, "snapshot sender starting ...");
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
if (pSender == NULL) {
sNError(pSyncNode, "start snapshot error, sender is null");
sNError(pSyncNode, "snapshot sender start error since get failed");
return -1;
}
int32_t code = 0;
if (snapshotSenderIsStart(pSender)) {
sNTrace(pSyncNode, "snapshot sender already start, ignore");
sSError(pSender, "snapshot sender already start, ignore");
return 0;
}
if (!snapshotSenderIsStart(pSender) && pSender->finish &&
taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) {
sNTrace(pSyncNode, "snapshot sender too frequently, ignore");
if (pSender->finish && taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) {
sSInfo(pSender, "snapshot sender start too frequently, ignore");
return 1;
}
char host[64];
uint16_t port;
syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
sInfo("vgId:%d, start snapshot for peer: %s:%d", pSyncNode->vgId, host, port);
sSInfo(pSender, "snapshot sender start for peer:%s:%u", host, port);
code = snapshotSenderStart(pSender);
int32_t code = snapshotSenderStart(pSender);
if (code != 0) {
sNError(pSyncNode, "snapshot sender start error");
sSError(pSender, "snapshot sender start error since %s", terrstr());
return -1;
}
......@@ -293,10 +319,9 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
(pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
if (!condition) return NULL;
SSyncSnapshotReceiver *pReceiver = NULL;
if (condition) {
pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
if (pReceiver == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
......@@ -313,50 +338,60 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
pReceiver->snapshot.lastApplyTerm = 0;
pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID;
} else {
sError("vgId:%d, cannot create snapshot receiver", pSyncNode->vgId);
}
sDebug("vgId:%d, snapshot receiver create", pSyncNode->vgId);
return pReceiver;
}
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
if (pReceiver != NULL) {
if (pReceiver == NULL) return;
sDebug("vgId:%d, snapshot receiver destroy", pReceiver->pSyncNode->vgId);
// close writer
if (pReceiver->pWriter != NULL) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
false, &(pReceiver->snapshot));
ASSERT(ret == 0);
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
&pReceiver->snapshot);
if (ret != 0) {
sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId, terrstr());
}
pReceiver->pWriter = NULL;
}
// free receiver
taosMemoryFree(pReceiver);
}
}
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
// force stop
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
sRInfo(pReceiver, "snapshot receiver force stop, writer:%p");
// force close, abandon incomplete data
if (pReceiver->pWriter != NULL) {
// event log
sRTrace(pReceiver, "snapshot receiver force stop");
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
&(pReceiver->snapshot));
ASSERT(ret == 0);
&pReceiver->snapshot);
if (ret != 0) {
sRInfo(pReceiver, "snapshot receiver force stop failed since %s", terrstr());
}
pReceiver->pWriter = NULL;
}
pReceiver->start = false;
// event log
// sRTrace(pReceiver, "snapshot receiver force stop");
}
int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
ASSERT(snapshotReceiverIsStart(pReceiver));
if (!snapshotReceiverIsStart(pReceiver)) {
sRError(pReceiver, "snapshot receiver is not start");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
if (pReceiver->pWriter != NULL) {
sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
// update ack
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
......@@ -365,25 +400,25 @@ int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapsh
pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
// start writer
ASSERT(pReceiver->pWriter == NULL);
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm,
&(pReceiver->snapshotParam), &(pReceiver->pWriter));
ASSERT(ret == 0);
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam,
&pReceiver->pWriter);
if (ret != 0) {
sRError(pReceiver, "snapshot receiver start write failed since %s", terrstr());
return -1;
}
// event log
sRTrace(pReceiver, "snapshot receiver start writer");
sRInfo(pReceiver, "snapshot receiver start write");
return 0;
}
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) {
if (snapshotReceiverIsStart(pReceiver)) {
sWarn("vgId:%d, snapshot receiver has started.", pReceiver->pSyncNode->vgId);
sRInfo(pReceiver, "snapshot receiver has started");
return 0;
}
......@@ -394,49 +429,57 @@ int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend
pReceiver->startTime = pPreMsg->startTime;
// event log
sRTrace(pReceiver, "snapshot receiver start");
sRInfo(pReceiver, "snapshot receiver is start");
return 0;
}
// just set start = false
// FpSnapshotStopWrite should not be called, assert writer == NULL
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
if (pReceiver->pWriter != NULL) {
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
&(pReceiver->snapshot));
ASSERT(ret == 0);
&pReceiver->snapshot);
if (ret != 0) {
sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr());
}
pReceiver->pWriter = NULL;
} else {
sRInfo(pReceiver, "snapshot receiver stop, writer is null");
}
pReceiver->start = false;
// event log
sRTrace(pReceiver, "snapshot receiver stop");
return 0;
}
// when recv last snapshot block, apply data into snapshot
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
ASSERT(pMsg->seq == SYNC_SNAPSHOT_SEQ_END);
if (pMsg->seq != SYNC_SNAPSHOT_SEQ_END) {
sRError(pReceiver, "snapshot receiver seq:%d is invalid", pMsg->seq);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
int32_t code = 0;
if (pReceiver->pWriter != NULL) {
// write data
sRInfo(pReceiver, "snapshot receiver write finish, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
if (pMsg->dataLen > 0) {
code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data,
pMsg->dataLen);
if (code != 0) {
sNError(pReceiver->pSyncNode, "snapshot write error");
sRError(pReceiver, "failed to finish snapshot receiver write since %s", terrstr());
return -1;
}
}
// reset wal
sRInfo(pReceiver, "snapshot receiver log restore");
code =
pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex);
if (code != 0) {
sNError(pReceiver->pSyncNode, "wal restore from snapshot error");
sRError(pReceiver, "failed to snapshot receiver log restore since %s", terrstr());
return -1;
}
......@@ -452,10 +495,11 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
}
// stop writer, apply data
sRInfo(pReceiver, "snapshot receiver apply write");
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
&(pReceiver->snapshot));
&pReceiver->snapshot);
if (code != 0) {
sNError(pReceiver->pSyncNode, "snapshot stop writer true error");
sRError(pReceiver, "snapshot receiver apply failed since %s", terrstr());
return -1;
}
pReceiver->pWriter = NULL;
......@@ -464,34 +508,48 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
pReceiver->ack = SYNC_SNAPSHOT_SEQ_END;
} else {
sNError(pReceiver->pSyncNode, "snapshot stop writer true error");
sRError(pReceiver, "snapshot receiver finish error since writer is null");
return -1;
}
// event log
sRTrace(pReceiver, "snapshot receiver got last data, finish, apply snapshot");
sRInfo(pReceiver, "snapshot receiver got last data and apply snapshot finished");
return 0;
}
// apply data block
// update progress
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
ASSERT(pMsg->seq == pReceiver->ack + 1);
static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) {
if (pMsg->seq != pReceiver->ack + 1) {
sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
if (pReceiver->pWriter == NULL) {
sRError(pReceiver, "snapshot receiver failed to write data since writer is null");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq);
if (pReceiver->pWriter != NULL) {
if (pMsg->dataLen > 0) {
// apply data block
int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
pMsg->data, pMsg->dataLen);
ASSERT(code == 0);
if (code != 0) {
sRError(pReceiver, "snapshot receiver continue write failed since %s", terrstr());
return -1;
}
}
// update progress
pReceiver->ack = pMsg->seq;
// event log
sRTrace(pReceiver, "snapshot receiver receiving");
}
sRDebug(pReceiver, "snapshot receiver continue to write finish");
return 0;
}
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
......@@ -499,7 +557,7 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
if (syncNodeIsMnode(ths)) {
snapStart = SYNC_INDEX_BEGIN;
sNInfo(ths, "snapshot begin index is %" PRId64 " since its mnode", snapStart);
} else {
SSyncLogStoreData *pData = ths->pLogStore->data;
SWal *pWal = pData->pWal;
......@@ -514,6 +572,8 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
} else {
snapStart = ths->commitIndex + 1;
}
sNInfo(ths, "snapshot begin index is %" PRId64, snapStart);
}
return snapStart;
......@@ -521,41 +581,48 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
static int32_t syncNodeOnSnapshotPre(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
int64_t timeNow = taosGetTimestampMs();
if (snapshotReceiverIsStart(pReceiver)) {
// already start
if (pMsg->startTime > pReceiver->startTime) {
sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " > msg startTime:%" PRId64 " start receiver",
pReceiver->startTime, pMsg->startTime);
goto _START_RECEIVER;
} else if (pMsg->startTime == pReceiver->startTime) {
sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " == msg startTime:%" PRId64 " send reply",
pReceiver->startTime, pMsg->startTime);
goto _SEND_REPLY;
} else {
// ignore
sRInfo(pReceiver, "snapshot receiver startTime:%" PRId64 " < msg startTime:%" PRId64 " ignore",
pReceiver->startTime, pMsg->startTime);
return 0;
}
} else {
// start new
sRInfo(pReceiver, "snapshot receiver not start yet so start new one");
goto _START_RECEIVER;
}
_START_RECEIVER:
if (taosGetTimestampMs() - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) {
sNError(pSyncNode, "snapshot receiver time skew too much");
if (timeNow - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) {
sRError(pReceiver, "snapshot receiver time skew too much, now:%" PRId64 " msg startTime:%" PRId64, timeNow,
pMsg->startTime);
return -1;
} else {
// waiting for clock match
int64_t timeNow = taosGetTimestampMs();
while (timeNow < pMsg->startTime) {
sNTrace(pSyncNode, "snapshot receiver pre waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
sRInfo(pReceiver, "snapshot receiver pre waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
pMsg->startTime);
taosMsleep(10);
timeNow = taosGetTimestampMs();
}
if (snapshotReceiverIsStart(pReceiver)) {
sRInfo(pReceiver, "snapshot receiver already start and force stop pre one");
snapshotReceiverForceStop(pReceiver);
}
......@@ -567,7 +634,10 @@ _SEND_REPLY:
; // make complier happy
SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId);
if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
return -1;
}
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId;
......@@ -581,8 +651,12 @@ _SEND_REPLY:
pRspMsg->snapBeginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
// send msg
syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "");
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver pre-snapshot");
if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
sRError(pReceiver, "snapshot receiver failed to build resp since %s", terrstr());
return -1;
}
return 0;
}
......@@ -591,12 +665,13 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
if (!snapshotReceiverIsStart(pReceiver)) {
sNError(pSyncNode, "snapshot receiver not start");
sRError(pReceiver, "snapshot receiver not start");
return -1;
}
if (pReceiver->startTime != pMsg->startTime) {
sNError(pSyncNode, "snapshot receiver time not equal");
sRError(pReceiver, "snapshot receiver startTime:%" PRId64 " not equal to msg startTime:%" PRId64,
pReceiver->startTime, pMsg->startTime);
return -1;
}
......@@ -605,7 +680,10 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
// build msg
SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId);
if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
return -1;
}
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId;
......@@ -619,8 +697,12 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
// send msg
syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "");
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver begin");
if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
return -1;
}
return 0;
}
......@@ -632,18 +714,22 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS
// waiting for clock match
int64_t timeNow = taosGetTimestampMs();
while (timeNow < pMsg->startTime) {
sNTrace(pSyncNode, "snapshot receiver transfering waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
sRInfo(pReceiver, "snapshot receiver receiving waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
pMsg->startTime);
taosMsleep(10);
timeNow = taosGetTimestampMs();
}
if (pMsg->seq == pReceiver->ack + 1) {
snapshotReceiverGotData(pReceiver, pMsg);
if (snapshotReceiverGotData(pReceiver, pMsg) != 0) {
return -1;
}
// build msg
SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId);
if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId)) {
sRError(pReceiver, "snapshot receiver build resp failed since %s", terrstr());
return -1;
}
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId;
......@@ -657,8 +743,11 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
// send msg
syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "");
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver receiving");
if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
sRError(pReceiver, "snapshot receiver send resp failed since %s", terrstr());
return -1;
}
return 0;
}
......@@ -670,7 +759,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
// waiting for clock match
int64_t timeNow = taosGetTimestampMs();
while (timeNow < pMsg->startTime) {
sNTrace(pSyncNode, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
sRInfo(pReceiver, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow,
pMsg->startTime);
taosMsleep(10);
timeNow = taosGetTimestampMs();
......@@ -683,7 +772,10 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
// build msg
SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId);
if (syncBuildSnapshotSendRsp(&rpcMsg, pSyncNode->vgId) != 0) {
sRError(pReceiver, "snapshot receiver build rsp failed since %s", terrstr());
return -1;
}
SyncSnapshotRsp *pRspMsg = rpcMsg.pCont;
pRspMsg->srcId = pSyncNode->myRaftId;
......@@ -697,8 +789,12 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start;
// send msg
syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "");
syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end");
if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) {
sRError(pReceiver, "snapshot receiver send rsp failed since %s", terrstr());
return -1;
}
return 0;
}
......@@ -724,15 +820,16 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
//
int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
SyncSnapshotSend *pMsg = pRpcMsg->pCont;
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
// if already drop replica, do not process
if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId))) {
if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config");
return 0;
}
if (pMsg->term < pSyncNode->pRaftStore->currentTerm) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject, small term");
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "reject since small term");
return 0;
}
......@@ -741,45 +838,42 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
}
syncNodeResetElectTimer(pSyncNode);
int32_t code = 0;
SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver;
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "");
// state, term, seq/ack
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
if (pMsg->seq == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq pre-snapshot");
syncNodeOnSnapshotPre(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_BEGIN) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq begin");
syncNodeOnSnapshotBegin(pSyncNode, pMsg);
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq end");
syncNodeOnSnapshotEnd(pSyncNode, pMsg);
(void)syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode);
if (syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode) != 0) {
sRError(pReceiver, "failed to reinit log buffer since %s", terrstr());
return -1;
}
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
// force close, no response
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process force stop");
snapshotReceiverForceStop(pReceiver);
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "process seq");
syncNodeOnSnapshotTransfering(pSyncNode, pMsg);
} else {
// error log
sRTrace(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
sRError(pReceiver, "snapshot receiver recv error seq:%d, my ack:%d", pMsg->seq, pReceiver->ack);
return -1;
}
} else {
// error log
sRTrace(pReceiver, "snapshot receiver term not equal");
sRError(pReceiver, "snapshot receiver term not equal");
return -1;
}
} else {
// error log
sRTrace(pReceiver, "snapshot receiver not follower");
sRError(pReceiver, "snapshot receiver not follower");
return -1;
}
......@@ -789,20 +883,26 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
// get sender
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
ASSERT(pSender != NULL);
if (pSender == NULL) {
sNError(pSyncNode, "prepare snapshot error since sender is null");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
SSnapshot snapshot;
SSnapshot snapshot = {0};
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
// prepare <begin, end>
pSender->snapshotParam.start = pMsg->snapBeginIndex;
pSender->snapshotParam.end = snapshot.lastApplyIndex;
sNTrace(pSyncNode, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
sSInfo(pSender, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64,
pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) {
sNError(pSyncNode, "snapshot last index too small");
sSError(pSender, "prepare snapshot failed since beginIndex:%d larger than applyIndex:%d", pMsg->snapBeginIndex,
snapshot.lastApplyIndex);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
......@@ -812,7 +912,7 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
// start reader
int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &(pSender->snapshotParam), &(pSender->pReader));
if (code != 0) {
sNError(pSyncNode, "create snapshot reader error");
sSError(pSender, "prepare snapshot failed since %s", terrstr());
return -1;
}
......@@ -824,7 +924,10 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
// build begin msg
SRpcMsg rpcMsg = {0};
(void)syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId);
if (syncBuildSnapshotSend(&rpcMsg, 0, pSender->pSyncNode->vgId) != 0) {
sSError(pSender, "prepare snapshot failed since build msg error");
return -1;
}
SyncSnapshotSend *pSendMsg = rpcMsg.pCont;
pSendMsg->srcId = pSender->pSyncNode->myRaftId;
......@@ -839,8 +942,11 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
pSendMsg->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
// send msg
syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg);
syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "");
syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, "snapshot sender reply pre");
if (syncNodeSendMsgById(&pSendMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) {
sSError(pSender, "prepare snapshot failed since send msg error");
return -1;
}
return 0;
}
......@@ -851,7 +957,7 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg)
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq
// condition 3 sender receives error msg, just print error log
//
int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
SyncSnapshotRsp *pMsg = pRpcMsg->pCont;
// if already drop replica, do not process
......@@ -861,36 +967,47 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
}
// get sender
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
ASSERT(pSender != NULL);
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId);
if (pSender == NULL) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null");
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
if (pMsg->startTime != pSender->startTime) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender/receiver start time not match");
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender:% " PRId64 " receiver:%" PRId64 " time not match");
return -1;
}
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "");
// state, term, seq/ack
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
if (pMsg->term == pSyncNode->pRaftStore->currentTerm) {
// prepare <begin, end>, send begin msg
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq pre-snapshot");
syncNodeOnSnapshotReplyPre(pSyncNode, pMsg);
return 0;
}
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) {
snapshotSenderUpdateProgress(pSender, pMsg);
snapshotSend(pSender);
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq begin");
if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
return -1;
}
if (snapshotSend(pSender) != 0) {
return -1;
}
return 0;
}
// receive ack is finish, close sender
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq end");
snapshotSenderStop(pSender, true);
SSyncLogReplMgr *pMgr = syncNodeGetLogReplMgr(pSyncNode, &pMsg->srcId);
if (pMgr) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "reset repl mgr");
syncLogReplMgrReset(pMgr);
}
return 0;
......@@ -898,12 +1015,18 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
// send next msg
if (pMsg->ack == pSender->seq) {
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq");
// update sender ack
snapshotSenderUpdateProgress(pSender, pMsg);
snapshotSend(pSender);
if (snapshotSenderUpdateProgress(pSender, pMsg) != 0) {
return -1;
}
if (snapshotSend(pSender) != 0) {
return -1;
}
} else if (pMsg->ack == pSender->seq - 1) {
// maybe resend
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "process seq and resend");
snapshotReSend(pSender);
} else {
......
......@@ -277,14 +277,12 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
if (pNode != NULL && pNode->pRaftCfg != NULL) {
taosPrintLog(flags, level, dflag,
"vgId:%d, sync %s "
"%s"
", term:%" PRIu64 ", commit-index:%" PRId64 ", first-ver:%" PRId64 ", last-ver:%" PRId64
", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64
"vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", first-ver:%" PRId64
", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64
", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hit:%d, mis:%d, hb-slow:%d, hbr-slow:%d, "
"aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64 ", %s, %s, %s, %s",
pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex,
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, logBeginIndex,
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum,
pNode->becomeLeaderNum, pNode->configChangeNum, cacheHit, cacheMiss, pNode->hbSlowNum,
pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex, pNode->replicaNum,
......@@ -330,13 +328,13 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
va_end(argpointer);
taosPrintLog(flags, level, dflag,
"vgId:%d, sync %s "
"%s {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64
"vgId:%d, %s, sync:%s, {%p s-param:%" PRId64 " e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64
" lcindex:%" PRId64
" seq:%d ack:%d finish:%d replica-index:%d %s:%d}"
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64 ", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%" PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
pNode->vgId, syncStr(pNode->state), eventLog, pSender, pSender->snapshotParam.start,
pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start,
pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex,
host, port, pNode->pRaftStore->currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex,
......@@ -382,14 +380,14 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
va_end(argpointer);
taosPrintLog(flags, level, dflag,
"vgId:%d, sync %s "
"%s {%p start:%d ack:%d term:%" PRIu64 " start-time:%" PRId64 " from:%s:%d s-param:%" PRId64
"vgId:%d, %s, sync:%s,"
" {%p start:%d ack:%d term:%" PRIu64 " start-time:%" PRId64 " from:%s:%d s-param:%" PRId64
" e-param:%" PRId64 " laindex:%" PRId64 " laterm:%" PRIu64 " lcindex:%" PRId64
"}"
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64 ", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%" PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
pNode->vgId, syncStr(pNode->state), eventLog, pReceiver, pReceiver->start, pReceiver->ack,
pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack,
pReceiver->term, pReceiver->startTime, host, port, pReceiver->snapshotParam.start,
pReceiver->snapshotParam.end, pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm,
pReceiver->snapshot.lastConfigIndex, pNode->pRaftStore->currentTerm, pNode->commitIndex, logBeginIndex,
......@@ -520,95 +518,56 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
port, pMsg->term, pMsg->timeStamp, s, timeDiff);
}
void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-pre-snapshot to %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
}
void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-pre-snapshot from %s:%d {term:%" PRId64 "}, %s", host, port, pMsg->term, s);
}
void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "send sync-pre-snapshot-reply to %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host, port,
pMsg->term, pMsg->snapStart, s);
}
void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshotReply* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode, "recv sync-pre-snapshot-reply from %s:%d {term:%" PRId64 ", snap-start:%" PRId64 "}, %s", host,
port, pMsg->term, pMsg->snapStart, s);
}
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
if (!(sDebugFlag & DEBUG_DEBUG)) return;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-snapshot-send to %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", seq:%d}, %s",
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s);
sNDebug(pSyncNode,
"send sync-snapshot-send to %s:%u, %s, seq:%d, term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64
", lterm:%" PRId64 ", stime:%" PRId64,
host, port, s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime);
}
void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
if (!(sDebugFlag & DEBUG_DEBUG)) return;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", seq:%d, len:%u}, %s",
host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq,
pMsg->dataLen, s);
sNDebug(pSyncNode,
"recv sync-snapshot-send from %s:%u, %s, seq:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64
", lterm:%" PRId64 ", stime:%" PRId64 ", len:%u",
host, port, s, pMsg->seq, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime,
pMsg->dataLen);
}
void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
if (!(sDebugFlag & DEBUG_DEBUG)) return;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-snapshot-rsp to %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", ack:%d}, %s",
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s);
sNDebug(pSyncNode,
"send sync-snapshot-rsp to %s:%u, %s, ack:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64
", lterm:%" PRId64 ", stime:%" PRId64,
host, port, s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime);
}
void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {
if (!(sDebugFlag & DEBUG_TRACE)) return;
if (!(sDebugFlag & DEBUG_DEBUG)) return;
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64
", stime:%" PRId64 ", ack:%d}, %s",
host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s);
sNDebug(pSyncNode,
"recv sync-snapshot-rsp from %s:%u, %s, ack:%d, term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64
", lterm:%" PRId64 ", stime:%" PRId64,
host, port, s, pMsg->ack, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime);
}
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
......
......@@ -132,7 +132,7 @@ char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
}
int32_t syncNodeOnPreSnapshot(SSyncNode *ths, SyncPreSnapshot *pMsg) {
syncLogRecvSyncPreSnapshot(ths, pMsg, "");
// syncLogRecvSyncPreSnapshot(ths, pMsg, "");
SyncPreSnapshotReply *pMsgReply = syncPreSnapshotReplyBuild(ths->vgId);
pMsgReply->srcId = ths->myRaftId;
......@@ -181,7 +181,7 @@ _IGNORE:
}
int32_t syncNodeOnPreSnapshotReply(SSyncNode *ths, SyncPreSnapshotReply *pMsg) {
syncLogRecvSyncPreSnapshotReply(ths, pMsg, "");
// syncLogRecvSyncPreSnapshotReply(ths, pMsg, "");
// start snapshot
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册