未验证 提交 1f978688 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #19242 from taosdata/fix/TS-2349

refact: adjust sync log
...@@ -25,29 +25,27 @@ extern "C" { ...@@ -25,29 +25,27 @@ extern "C" {
// SIndexMgr ----------------------------- // SIndexMgr -----------------------------
typedef struct SSyncIndexMgr { typedef struct SSyncIndexMgr {
SRaftId (*replicas)[TSDB_MAX_REPLICA]; SRaftId (*replicas)[TSDB_MAX_REPLICA];
SyncIndex index[TSDB_MAX_REPLICA]; SyncIndex index[TSDB_MAX_REPLICA];
SyncTerm privateTerm[TSDB_MAX_REPLICA]; // for advanced function SyncTerm privateTerm[TSDB_MAX_REPLICA]; // for advanced function
int64_t startTimeArr[TSDB_MAX_REPLICA];
int64_t startTimeArr[TSDB_MAX_REPLICA]; int64_t recvTimeArr[TSDB_MAX_REPLICA];
int64_t recvTimeArr[TSDB_MAX_REPLICA];
int32_t replicaNum; int32_t replicaNum;
SSyncNode *pSyncNode; SSyncNode *pNode;
} SSyncIndexMgr; } SSyncIndexMgr;
SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode); SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pNode);
void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode); void syncIndexMgrUpdate(SSyncIndexMgr *pIndexMgr, SSyncNode *pNode);
void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrDestroy(SSyncIndexMgr *pIndexMgr);
void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrClear(SSyncIndexMgr *pIndexMgr);
void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index); void syncIndexMgrSetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncIndex index);
SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId);
void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime); void syncIndexMgrSetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t startTime);
int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId);
void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime); void syncIndexMgrSetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t recvTime);
int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId);
void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term); void syncIndexMgrSetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncTerm term);
SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -30,12 +30,12 @@ typedef struct SVotesGranted { ...@@ -30,12 +30,12 @@ typedef struct SVotesGranted {
SyncTerm term; SyncTerm term;
int32_t quorum; int32_t quorum;
bool toLeader; bool toLeader;
SSyncNode *pSyncNode; SSyncNode *pNode;
} SVotesGranted; } SVotesGranted;
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode); SVotesGranted *voteGrantedCreate(SSyncNode *pNode);
void voteGrantedDestroy(SVotesGranted *pVotesGranted); void voteGrantedDestroy(SVotesGranted *pVotesGranted);
void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode); void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pNode);
bool voteGrantedMajority(SVotesGranted *pVotesGranted); bool voteGrantedMajority(SVotesGranted *pVotesGranted);
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg); void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg);
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term);
...@@ -45,12 +45,12 @@ typedef struct SVotesRespond { ...@@ -45,12 +45,12 @@ typedef struct SVotesRespond {
bool isRespond[TSDB_MAX_REPLICA]; bool isRespond[TSDB_MAX_REPLICA];
int32_t replicaNum; int32_t replicaNum;
SyncTerm term; SyncTerm term;
SSyncNode *pSyncNode; SSyncNode *pNode;
} SVotesRespond; } SVotesRespond;
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode); SVotesRespond *votesRespondCreate(SSyncNode *pNode);
void votesRespondDestory(SVotesRespond *pVotesRespond); void votesRespondDestory(SVotesRespond *pVotesRespond);
void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode); void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pNode);
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId); bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId);
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg); void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg);
void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term); void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term);
......
...@@ -17,173 +17,172 @@ ...@@ -17,173 +17,172 @@
#include "syncIndexMgr.h" #include "syncIndexMgr.h"
#include "syncUtil.h" #include "syncUtil.h"
SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) { SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pNode) {
SSyncIndexMgr *pSyncIndexMgr = taosMemoryCalloc(1, sizeof(SSyncIndexMgr)); SSyncIndexMgr *pIndexMgr = taosMemoryCalloc(1, sizeof(SSyncIndexMgr));
if (pSyncIndexMgr == NULL) { if (pIndexMgr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pSyncIndexMgr->replicas = &(pSyncNode->replicasId); pIndexMgr->replicas = &pNode->replicasId;
pSyncIndexMgr->replicaNum = pSyncNode->replicaNum; pIndexMgr->replicaNum = pNode->replicaNum;
pSyncIndexMgr->pSyncNode = pSyncNode; pIndexMgr->pNode = pNode;
syncIndexMgrClear(pSyncIndexMgr); syncIndexMgrClear(pIndexMgr);
return pSyncIndexMgr; return pIndexMgr;
} }
void syncIndexMgrUpdate(SSyncIndexMgr *pSyncIndexMgr, SSyncNode *pSyncNode) { void syncIndexMgrUpdate(SSyncIndexMgr *pIndexMgr, SSyncNode *pNode) {
pSyncIndexMgr->replicas = &(pSyncNode->replicasId); pIndexMgr->replicas = &pNode->replicasId;
pSyncIndexMgr->replicaNum = pSyncNode->replicaNum; pIndexMgr->replicaNum = pNode->replicaNum;
pSyncIndexMgr->pSyncNode = pSyncNode; pIndexMgr->pNode = pNode;
syncIndexMgrClear(pSyncIndexMgr); syncIndexMgrClear(pIndexMgr);
} }
void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) { void syncIndexMgrDestroy(SSyncIndexMgr *pIndexMgr) {
if (pSyncIndexMgr != NULL) { if (pIndexMgr != NULL) {
taosMemoryFree(pSyncIndexMgr); taosMemoryFree(pIndexMgr);
} }
} }
void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr) { void syncIndexMgrClear(SSyncIndexMgr *pIndexMgr) {
memset(pSyncIndexMgr->index, 0, sizeof(pSyncIndexMgr->index)); memset(pIndexMgr->index, 0, sizeof(pIndexMgr->index));
memset(pSyncIndexMgr->privateTerm, 0, sizeof(pSyncIndexMgr->privateTerm)); memset(pIndexMgr->privateTerm, 0, sizeof(pIndexMgr->privateTerm));
// int64_t timeNow = taosGetMonotonicMs();
int64_t timeNow = taosGetTimestampMs(); int64_t timeNow = taosGetTimestampMs();
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { for (int i = 0; i < pIndexMgr->replicaNum; ++i) {
pSyncIndexMgr->startTimeArr[i] = 0; pIndexMgr->startTimeArr[i] = 0;
pSyncIndexMgr->recvTimeArr[i] = timeNow; pIndexMgr->recvTimeArr[i] = timeNow;
} }
/*
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
pSyncIndexMgr->index[i] = 0;
}
*/
} }
void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index) { void syncIndexMgrSetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncIndex index) {
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { for (int i = 0; i < pIndexMgr->replicaNum; ++i) {
if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
(pSyncIndexMgr->index)[i] = index; (pIndexMgr->index)[i] = index;
return; return;
} }
} }
// maybe config change
// ASSERT(0);
char host[128]; char host[128];
uint16_t port; uint16_t port;
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port); syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, index mgr set for %s:%d, index:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, sError("vgId:%d, indexmgr set index:%" PRId64 " for %s:%d failed", pIndexMgr->pNode->vgId, index, host, port);
index);
} }
SSyncLogReplMgr *syncNodeGetLogReplMgr(SSyncNode *pNode, SRaftId *pDestId) { SSyncLogReplMgr *syncNodeGetLogReplMgr(SSyncNode *pNode, SRaftId *pRaftId) {
for (int i = 0; i < pNode->replicaNum; i++) { for (int i = 0; i < pNode->replicaNum; i++) {
if (syncUtilSameId(&(pNode->replicasId[i]), pDestId)) { if (syncUtilSameId(&pNode->replicasId[i], pRaftId)) {
return pNode->logReplMgrs[i]; return pNode->logReplMgrs[i];
} }
} }
char host[128];
uint16_t port;
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, indexmgr get replmgr from %s:%d failed", pNode->vgId, host, port);
return NULL; return NULL;
} }
SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
if (pSyncIndexMgr == NULL) { for (int i = 0; i < pIndexMgr->replicaNum; ++i) {
return SYNC_INDEX_INVALID; if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
} SyncIndex idx = (pIndexMgr->index)[i];
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
SyncIndex idx = (pSyncIndexMgr->index)[i];
return idx; return idx;
} }
} }
char host[128];
uint16_t port;
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, indexmgr get index from %s:%d failed", pIndexMgr->pNode->vgId, host, port);
return SYNC_INDEX_INVALID; return SYNC_INDEX_INVALID;
} }
void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime) { void syncIndexMgrSetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t startTime) {
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { for (int i = 0; i < pIndexMgr->replicaNum; ++i) {
if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
(pSyncIndexMgr->startTimeArr)[i] = startTime; (pIndexMgr->startTimeArr)[i] = startTime;
return; return;
} }
} }
// maybe config change
// ASSERT(0);
char host[128]; char host[128];
uint16_t port; uint16_t port;
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port); syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, index mgr set for %s:%d, start-time:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, sError("vgId:%d, indexmgr set start-time:%" PRId64 " for %s:%d failed", pIndexMgr->pNode->vgId, startTime, host,
startTime); port);
} }
int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { for (int i = 0; i < pIndexMgr->replicaNum; ++i) {
if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
int64_t startTime = (pSyncIndexMgr->startTimeArr)[i]; int64_t startTime = (pIndexMgr->startTimeArr)[i];
return startTime; return startTime;
} }
} }
ASSERT(0);
char host[128];
uint16_t port;
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, indexmgr get start-time from %s:%d failed", pIndexMgr->pNode->vgId, host, port);
return -1; return -1;
} }
void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime) { void syncIndexMgrSetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, int64_t recvTime) {
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { for (int i = 0; i < pIndexMgr->replicaNum; ++i) {
if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
(pSyncIndexMgr->recvTimeArr)[i] = recvTime; (pIndexMgr->recvTimeArr)[i] = recvTime;
return; return;
} }
} }
// maybe config change
// ASSERT(0);
char host[128]; char host[128];
uint16_t port; uint16_t port;
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port); syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, index mgr set for %s:%d, recv-time:%" PRId64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, sError("vgId:%d, indexmgr set recv-time:%" PRId64 " for %s:%d failed", pIndexMgr->pNode->vgId, recvTime, host, port);
recvTime);
} }
int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { for (int i = 0; i < pIndexMgr->replicaNum; ++i) {
if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
int64_t recvTime = (pSyncIndexMgr->recvTimeArr)[i]; int64_t recvTime = (pIndexMgr->recvTimeArr)[i];
return recvTime; return recvTime;
} }
} }
char host[128];
uint16_t port;
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, indexmgr get recv-time from %s:%d failed", pIndexMgr->pNode->vgId, host, port);
return -1; return -1;
} }
void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term) { void syncIndexMgrSetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId, SyncTerm term) {
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { for (int i = 0; i < pIndexMgr->replicaNum; ++i) {
if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
(pSyncIndexMgr->privateTerm)[i] = term; (pIndexMgr->privateTerm)[i] = term;
return; return;
} }
} }
// maybe config change
// ASSERT(0);
char host[128]; char host[128];
uint16_t port; uint16_t port;
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port); syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, index mgr set for %s:%d, term:%" PRIu64 " error", pSyncIndexMgr->pSyncNode->vgId, host, port, term); sError("vgId:%d, indexmgr set term:%" PRId64 " for %s:%d failed", pIndexMgr->pNode->vgId, term, host, port);
} }
SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId) { SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pIndexMgr, const SRaftId *pRaftId) {
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { for (int i = 0; i < pIndexMgr->replicaNum; ++i) {
if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) { if (syncUtilSameId(&((*(pIndexMgr->replicas))[i]), pRaftId)) {
SyncTerm term = (pSyncIndexMgr->privateTerm)[i]; SyncTerm term = (pIndexMgr->privateTerm)[i];
return term; return term;
} }
} }
ASSERT(0);
char host[128];
uint16_t port;
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
sError("vgId:%d, indexmgr get term from %s:%d failed", pIndexMgr->pNode->vgId, host, port);
return -1; return -1;
} }
...@@ -118,12 +118,12 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { ...@@ -118,12 +118,12 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL); SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL);
int cnt = 0; int cnt = 0;
int sum = 0; int sum = 0;
SSyncNode *pSyncNode = pObj->data; SSyncNode *pNode = pObj->data;
SArray *delIndexArray = taosArrayInit(4, sizeof(uint64_t)); SArray *delIndexArray = taosArrayInit(4, sizeof(uint64_t));
if (delIndexArray == NULL) return; if (delIndexArray == NULL) return;
sDebug("vgId:%d, resp manager begin clean by ttl", pSyncNode->vgId); sDebug("vgId:%d, resp manager begin clean by ttl", pNode->vgId);
while (pStub) { while (pStub) {
size_t len; size_t len;
void *key = taosHashGetKey(pStub, &len); void *key = taosHashGetKey(pStub, &len);
...@@ -140,20 +140,18 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { ...@@ -140,20 +140,18 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
.lastConfigIndex = SYNC_INDEX_INVALID, .lastConfigIndex = SYNC_INDEX_INVALID,
.isWeak = false, .isWeak = false,
.code = TSDB_CODE_SYN_TIMEOUT, .code = TSDB_CODE_SYN_TIMEOUT,
.state = pSyncNode->state, .state = pNode->state,
.seqNum = *pSeqNum, .seqNum = *pSeqNum,
.term = SYNC_TERM_INVALID, .term = SYNC_TERM_INVALID,
.currentTerm = pSyncNode->pRaftStore->currentTerm, .currentTerm = pNode->pRaftStore->currentTerm,
.flag = 0, .flag = 0,
}; };
pStub->rpcMsg.pCont = NULL; pStub->rpcMsg.pCont = NULL;
pStub->rpcMsg.contLen = 0; pStub->rpcMsg.contLen = 0;
// TODO: and make rpcMsg body, call commit cb
// pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &pStub->rpcMsg, cbMeta);
SRpcMsg rpcMsg = {.info = pStub->rpcMsg.info, .code = TSDB_CODE_SYN_TIMEOUT}; SRpcMsg rpcMsg = {.info = pStub->rpcMsg.info, .code = TSDB_CODE_SYN_TIMEOUT};
sInfo("vgId:%d, message handle:%p expired, type:%s ahandle:%p", pSyncNode->vgId, rpcMsg.info.handle, sInfo("vgId:%d, message handle:%p expired, type:%s ahandle:%p", pNode->vgId, rpcMsg.info.handle,
TMSG_INFO(pStub->rpcMsg.msgType), rpcMsg.info.ahandle); TMSG_INFO(pStub->rpcMsg.msgType), rpcMsg.info.ahandle);
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
} }
...@@ -162,12 +160,12 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { ...@@ -162,12 +160,12 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
} }
int32_t arraySize = taosArrayGetSize(delIndexArray); int32_t arraySize = taosArrayGetSize(delIndexArray);
sDebug("vgId:%d, resp manager end clean by ttl, sum:%d, cnt:%d, array-size:%d", pSyncNode->vgId, sum, cnt, arraySize); sDebug("vgId:%d, resp manager end clean by ttl, sum:%d, cnt:%d, array-size:%d", pNode->vgId, sum, cnt, arraySize);
for (int32_t i = 0; i < arraySize; ++i) { for (int32_t i = 0; i < arraySize; ++i) {
uint64_t *pSeqNum = taosArrayGet(delIndexArray, i); uint64_t *pSeqNum = taosArrayGet(delIndexArray, i);
taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t)); taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t));
sDebug("vgId:%d, resp manager clean by ttl, seq:%" PRId64, pSyncNode->vgId, *pSeqNum); sDebug("vgId:%d, resp manager clean by ttl, seq:%" PRId64, pNode->vgId, *pSeqNum);
} }
taosArrayDestroy(delIndexArray); taosArrayDestroy(delIndexArray);
} }
......
...@@ -406,7 +406,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p ...@@ -406,7 +406,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
} }
// just set start = false // just set start = false
// FpSnapshotStopWrite should not be called, assert writer == NULL // FpSnapshotStopWrite should not be called
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter); sRInfo(pReceiver, "snapshot receiver stop, not apply, writer:%p", pReceiver->pWriter);
......
...@@ -23,21 +23,21 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) { ...@@ -23,21 +23,21 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) {
pVotesGranted->votes = 0; pVotesGranted->votes = 0;
} }
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) { SVotesGranted *voteGrantedCreate(SSyncNode *pNode) {
SVotesGranted *pVotesGranted = taosMemoryCalloc(1, sizeof(SVotesGranted)); SVotesGranted *pVotesGranted = taosMemoryCalloc(1, sizeof(SVotesGranted));
if (pVotesGranted == NULL) { if (pVotesGranted == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pVotesGranted->replicas = &(pSyncNode->replicasId); pVotesGranted->replicas = &pNode->replicasId;
pVotesGranted->replicaNum = pSyncNode->replicaNum; pVotesGranted->replicaNum = pNode->replicaNum;
voteGrantedClearVotes(pVotesGranted); voteGrantedClearVotes(pVotesGranted);
pVotesGranted->term = 0; pVotesGranted->term = 0;
pVotesGranted->quorum = pSyncNode->quorum; pVotesGranted->quorum = pNode->quorum;
pVotesGranted->toLeader = false; pVotesGranted->toLeader = false;
pVotesGranted->pSyncNode = pSyncNode; pVotesGranted->pNode = pNode;
return pVotesGranted; return pVotesGranted;
} }
...@@ -48,33 +48,33 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted) { ...@@ -48,33 +48,33 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted) {
} }
} }
void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode) { void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pNode) {
pVotesGranted->replicas = &(pSyncNode->replicasId); pVotesGranted->replicas = &pNode->replicasId;
pVotesGranted->replicaNum = pSyncNode->replicaNum; pVotesGranted->replicaNum = pNode->replicaNum;
voteGrantedClearVotes(pVotesGranted); voteGrantedClearVotes(pVotesGranted);
pVotesGranted->term = 0; pVotesGranted->term = 0;
pVotesGranted->quorum = pSyncNode->quorum; pVotesGranted->quorum = pNode->quorum;
pVotesGranted->toLeader = false; pVotesGranted->toLeader = false;
pVotesGranted->pSyncNode = pSyncNode; pVotesGranted->pNode = pNode;
} }
bool voteGrantedMajority(SVotesGranted *pVotesGranted) { return pVotesGranted->votes >= pVotesGranted->quorum; } bool voteGrantedMajority(SVotesGranted *pVotesGranted) { return pVotesGranted->votes >= pVotesGranted->quorum; }
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) { void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
if (!pMsg->voteGranted) { if (!pMsg->voteGranted) {
sNFatal(pVotesGranted->pSyncNode, "vote granted should be true"); sNFatal(pVotesGranted->pNode, "vote granted should be true");
return; return;
} }
if (pMsg->term != pVotesGranted->term) { if (pMsg->term != pVotesGranted->term) {
sNTrace(pVotesGranted->pSyncNode, "vote grant term:%" PRId64 " not matched with msg term:%" PRId64, sNTrace(pVotesGranted->pNode, "vote grant term:%" PRId64 " not matched with msg term:%" PRId64, pVotesGranted->term,
pVotesGranted->term, pMsg->term); pMsg->term);
return; return;
} }
if (!syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId)) { if (!syncUtilSameId(&pVotesGranted->pNode->myRaftId, &pMsg->destId)) {
sNFatal(pVotesGranted->pSyncNode, "vote granted raftId not matched with msg"); sNFatal(pVotesGranted->pNode, "vote granted raftId not matched with msg");
return; return;
} }
...@@ -86,7 +86,7 @@ void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) { ...@@ -86,7 +86,7 @@ void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
} }
} }
if ((j == -1) || !(j >= 0 && j < pVotesGranted->replicaNum)) { if ((j == -1) || !(j >= 0 && j < pVotesGranted->replicaNum)) {
sNFatal(pVotesGranted->pSyncNode, "invalid msg srcId, index:%d", j); sNFatal(pVotesGranted->pNode, "invalid msg srcId, index:%d", j);
return; return;
} }
...@@ -96,7 +96,7 @@ void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) { ...@@ -96,7 +96,7 @@ void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
} }
if (pVotesGranted->votes > pVotesGranted->replicaNum) { if (pVotesGranted->votes > pVotesGranted->replicaNum) {
sNFatal(pVotesGranted->pSyncNode, "votes:%d not matched with replicaNum:%d", pVotesGranted->votes, sNFatal(pVotesGranted->pNode, "votes:%d not matched with replicaNum:%d", pVotesGranted->votes,
pVotesGranted->replicaNum); pVotesGranted->replicaNum);
return; return;
} }
...@@ -108,17 +108,17 @@ void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) { ...@@ -108,17 +108,17 @@ void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) {
pVotesGranted->toLeader = false; pVotesGranted->toLeader = false;
} }
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) { SVotesRespond *votesRespondCreate(SSyncNode *pNode) {
SVotesRespond *pVotesRespond = taosMemoryCalloc(1, sizeof(SVotesRespond)); SVotesRespond *pVotesRespond = taosMemoryCalloc(1, sizeof(SVotesRespond));
if (pVotesRespond == NULL) { if (pVotesRespond == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pVotesRespond->replicas = &(pSyncNode->replicasId); pVotesRespond->replicas = &pNode->replicasId;
pVotesRespond->replicaNum = pSyncNode->replicaNum; pVotesRespond->replicaNum = pNode->replicaNum;
pVotesRespond->term = 0; pVotesRespond->term = 0;
pVotesRespond->pSyncNode = pSyncNode; pVotesRespond->pNode = pNode;
return pVotesRespond; return pVotesRespond;
} }
...@@ -129,11 +129,11 @@ void votesRespondDestory(SVotesRespond *pVotesRespond) { ...@@ -129,11 +129,11 @@ void votesRespondDestory(SVotesRespond *pVotesRespond) {
} }
} }
void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSyncNode) { void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pNode) {
pVotesRespond->replicas = &(pSyncNode->replicasId); pVotesRespond->replicas = &pNode->replicasId;
pVotesRespond->replicaNum = pSyncNode->replicaNum; pVotesRespond->replicaNum = pNode->replicaNum;
pVotesRespond->term = 0; pVotesRespond->term = 0;
pVotesRespond->pSyncNode = pSyncNode; pVotesRespond->pNode = pNode;
} }
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) { bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
...@@ -149,7 +149,7 @@ bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) { ...@@ -149,7 +149,7 @@ bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) { void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) {
if (pVotesRespond->term != pMsg->term) { if (pVotesRespond->term != pMsg->term) {
sNTrace(pVotesRespond->pSyncNode, "vote respond add error"); sNTrace(pVotesRespond->pNode, "vote respond add error");
return; return;
} }
...@@ -160,7 +160,7 @@ void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *p ...@@ -160,7 +160,7 @@ void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *p
} }
} }
sNFatal(pVotesRespond->pSyncNode, "votes respond not found"); sNFatal(pVotesRespond->pNode, "votes respond not found");
} }
void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term) { void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册