未验证 提交 732d69db 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #14211 from taosdata/feature/3.0_mhli

refactor(sync): adjust log buf size
......@@ -200,6 +200,7 @@ const char* syncGetMyRoleStr(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid);
SyncGroupId syncGetVgId(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
bool syncEnvIsStart();
const char* syncStr(ESyncState state);
......
......@@ -198,6 +198,10 @@ int32_t mndInitSync(SMnode *pMnode) {
return -1;
}
// decrease election timer
setElectTimerMS(pMgmt->sync, 600);
setHeartbeatTimerMS(pMgmt->sync, 300);
mDebug("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
return 0;
}
......@@ -261,7 +265,8 @@ bool mndIsMaster(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
if (!syncIsReady(pMgmt->sync)) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
// get terrno from syncIsReady
// terrno = TSDB_CODE_SYN_NOT_LEADER;
return false;
}
......
......@@ -144,11 +144,15 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
vnodeAccumBlockMsg(pVnode, pMsg->msgType);
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
SEpSet newEpSet = {0};
syncGetRetryEpSet(pVnode->sync, &newEpSet);
/*
syncGetEpSet(pVnode->sync, &newEpSet);
SEp *pEp = &newEpSet.eps[newEpSet.inUse];
if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) {
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
}
*/
vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps,
newEpSet.inUse);
......
......@@ -105,9 +105,16 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
}
int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) {
sTrace("syncNodeRequestVote pSyncNode:%p ", pSyncNode);
int32_t ret = 0;
do {
char host[128];
uint16_t port;
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-request-vote to %s:%d, {term:%lu, last-index:%ld, last-term:%lu}", pSyncNode->vgId, host,
port, pMsg->term, pMsg->lastLogTerm, pMsg->lastLogIndex);
} while (0);
SRpcMsg rpcMsg;
syncRequestVote2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
......
......@@ -370,6 +370,15 @@ bool syncIsReady(int64_t rid) {
ASSERT(rid == pSyncNode->rid);
bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && pSyncNode->restoreFinish;
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
// if false, set error code
if (false == b) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
} else {
terrno = TSDB_CODE_APP_NOT_READY;
}
}
return b;
}
......@@ -480,12 +489,30 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
(pEpSet->numOfEps)++;
sInfo("syncGetEpSet index:%d %s:%d", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
sInfo("vgId:%d sync get epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
}
pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
sInfo("vgId:%d sync get epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
sInfo("syncGetEpSet pEpSet->inUse:%d ", pEpSet->inUse);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
memset(pEpSet, 0, sizeof(*pEpSet));
return;
}
ASSERT(rid == pSyncNode->rid);
pEpSet->numOfEps = 0;
for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
(pEpSet->numOfEps)++;
sInfo("vgId:%d sync get retry epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
}
pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
sInfo("vgId:%d sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}
......
......@@ -224,8 +224,8 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c
uint16_t port;
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
sDebug(
"vgId:%d, send sync-append-entries to %s:%d, term:%lu, pre-index:%ld, pre-term:%lu, pterm:%lu, commit:%ld, "
"datalen:%d",
"vgId:%d, send sync-append-entries to %s:%d, {term:%lu, pre-index:%ld, pre-term:%lu, pterm:%lu, commit:%ld, "
"datalen:%d}",
pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm,
pMsg->commitIndex, pMsg->dataLen);
} while (0);
......
......@@ -99,15 +99,20 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM
int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t ret = 0;
// print log
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "recv SyncRequestVote, currentTerm:%lu", ths->pRaftStore->currentTerm);
syncRequestVoteLog2(logBuf, pMsg);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
sInfo("recv SyncRequestVote maybe replica already dropped");
return ret;
do {
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, term:%lu, lindex:%ld, lterm:%lu, maybe replica already dropped",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
syncNodeEventLog(ths, logBuf);
} while (0);
return -1;
}
// maybe update term
......@@ -135,10 +140,22 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
pReply->term = ths->pRaftStore->currentTerm;
pReply->voteGranted = grant;
// trace log
do {
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, term:%lu, lindex:%ld, lterm:%lu, reply-grant:%d", host, port,
pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
syncNodeEventLog(ths, logBuf);
} while (0);
SRpcMsg rpcMsg;
syncRequestVoteReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncRequestVoteReplyDestroy(pReply);
return ret;
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册