未验证 提交 be2a0e28 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #14075 from taosdata/feature/3.0_mhli

fix(sync): sender get config from local
...@@ -162,7 +162,7 @@ typedef struct SSyncLogStore { ...@@ -162,7 +162,7 @@ typedef struct SSyncLogStore {
SyncIndex (*syncLogEndIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogEndIndex)(struct SSyncLogStore* pLogStore);
bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore); bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore);
int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore); int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore);
bool (*syncLogInRange)(struct SSyncLogStore* pLogStore, SyncIndex index); // bool (*syncLogInRange)(struct SSyncLogStore* pLogStore, SyncIndex index);
SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore);
SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
......
...@@ -197,6 +197,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S ...@@ -197,6 +197,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
cJSON* syncNode2Json(const SSyncNode* pSyncNode); cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode);
void syncNodeEventLog(const SSyncNode* pSyncNode, char* str); void syncNodeEventLog(const SSyncNode* pSyncNode, char* str);
void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str);
char* syncNode2SimpleStr(const SSyncNode* pSyncNode); char* syncNode2SimpleStr(const SSyncNode* pSyncNode);
bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config); bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config);
void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex); void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex);
......
...@@ -99,19 +99,25 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -99,19 +99,25 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term); syncNodeUpdateTerm(ths, pMsg->term);
} }
assert(pMsg->term <= ths->pRaftStore->currentTerm); ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
// reset elect timer // reset elect timer
if (pMsg->term == ths->pRaftStore->currentTerm) { if (pMsg->term == ths->pRaftStore->currentTerm) {
ths->leaderCache = pMsg->srcId; ths->leaderCache = pMsg->srcId;
syncNodeResetElectTimer(ths); syncNodeResetElectTimer(ths);
} }
assert(pMsg->dataLen >= 0); ASSERT(pMsg->dataLen >= 0);
SyncTerm localPreLogTerm = 0; SyncTerm localPreLogTerm = 0;
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, pMsg->prevLogIndex); SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, pMsg->prevLogIndex);
assert(pEntry != NULL); if (pEntry == NULL) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "getEntry error, index:%ld, since %s", pMsg->prevLogIndex, terrstr());
syncNodeErrorLog(ths, logBuf);
return -1;
}
localPreLogTerm = pEntry->term; localPreLogTerm = pEntry->term;
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
} }
...@@ -160,7 +166,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -160,7 +166,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// accept request // accept request
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) {
// preIndex = -1, or has preIndex entry in local log // preIndex = -1, or has preIndex entry in local log
assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); ASSERT(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore));
// has extra entries (> preIndex) in local log // has extra entries (> preIndex) in local log
bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore); bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore);
...@@ -179,13 +185,21 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -179,13 +185,21 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SyncIndex extraIndex = pMsg->prevLogIndex + 1; SyncIndex extraIndex = pMsg->prevLogIndex + 1;
SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex); SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex);
assert(pExtraEntry != NULL); if (pExtraEntry == NULL) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "getEntry error2, index:%ld, since %s", extraIndex, terrstr());
syncNodeErrorLog(ths, logBuf);
return -1;
}
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL); if (pAppendEntry == NULL) {
syncNodeErrorLog(ths, "syncEntryDeserialize pAppendEntry error");
return -1;
}
// log not match, conflict // log not match, conflict
assert(extraIndex == pAppendEntry->index); ASSERT(extraIndex == pAppendEntry->index);
if (pExtraEntry->term != pAppendEntry->term) { if (pExtraEntry->term != pAppendEntry->term) {
conflict = true; conflict = true;
} }
...@@ -201,7 +215,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -201,7 +215,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
for (SyncIndex index = delEnd; index >= delBegin; --index) { for (SyncIndex index = delEnd; index >= delBegin; --index) {
if (ths->pFsm->FpRollBackCb != NULL) { if (ths->pFsm->FpRollBackCb != NULL) {
SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index); SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index);
assert(pRollBackEntry != NULL); if (pRollBackEntry == NULL) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "getEntry error3, index:%ld, since %s", index, terrstr());
syncNodeErrorLog(ths, logBuf);
return -1;
}
// if (pRollBackEntry->msgType != TDMT_SYNC_NOOP) { // if (pRollBackEntry->msgType != TDMT_SYNC_NOOP) {
if (syncUtilUserRollback(pRollBackEntry->msgType)) { if (syncUtilUserRollback(pRollBackEntry->msgType)) {
...@@ -257,7 +276,10 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -257,7 +276,10 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
} else if (!hasExtraEntries && hasAppendEntries) { } else if (!hasExtraEntries && hasAppendEntries) {
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
assert(pAppendEntry != NULL); if (pAppendEntry == NULL) {
syncNodeErrorLog(ths, "syncEntryDeserialize pAppendEntry2 error");
return -1;
}
// append new entries // append new entries
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
...@@ -287,7 +309,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -287,7 +309,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// do nothing // do nothing
} else { } else {
assert(0); syncNodeLog3("", ths);
ASSERT(0);
} }
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
......
...@@ -67,7 +67,7 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p ...@@ -67,7 +67,7 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
return ret; return ret;
} }
assert(pMsg->term == ths->pRaftStore->currentTerm); ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
if (pMsg->success) { if (pMsg->success) {
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
......
...@@ -75,7 +75,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -75,7 +75,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
if (agree) { if (agree) {
// term // term
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index); SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index);
assert(pEntry != NULL); ASSERT(pEntry != NULL);
// cannot commit, even if quorum agree. need check term! // cannot commit, even if quorum agree. need check term!
if (pEntry->term == pSyncNode->pRaftStore->currentTerm) { if (pEntry->term == pSyncNode->pRaftStore->currentTerm) {
......
...@@ -32,7 +32,7 @@ ...@@ -32,7 +32,7 @@
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// //
int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
int32_t ret = 0; int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) { for (int i = 0; i < pSyncNode->peersNum; ++i) {
...@@ -44,7 +44,7 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { ...@@ -44,7 +44,7 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
pMsg->lastLogTerm = pSyncNode->pLogStore->getLastTerm(pSyncNode->pLogStore); pMsg->lastLogTerm = pSyncNode->pLogStore->getLastTerm(pSyncNode->pLogStore);
ret = syncNodeRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg); ret = syncNodeRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg);
assert(ret == 0); ASSERT(ret == 0);
syncRequestVoteDestroy(pMsg); syncRequestVoteDestroy(pMsg);
} }
return ret; return ret;
...@@ -75,7 +75,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { ...@@ -75,7 +75,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
syncNodeFollower2Candidate(pSyncNode); syncNodeFollower2Candidate(pSyncNode);
} }
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
// start election // start election
raftStoreNextTerm(pSyncNode->pRaftStore); raftStoreNextTerm(pSyncNode->pRaftStore);
...@@ -86,7 +86,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { ...@@ -86,7 +86,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
syncNodeVoteForSelf(pSyncNode); syncNodeVoteForSelf(pSyncNode);
if (voteGrantedMajority(pSyncNode->pVotesGranted)) { if (voteGrantedMajority(pSyncNode->pVotesGranted)) {
// only myself, to leader // only myself, to leader
assert(!pSyncNode->pVotesGranted->toLeader); ASSERT(!pSyncNode->pVotesGranted->toLeader);
syncNodeCandidate2Leader(pSyncNode); syncNodeCandidate2Leader(pSyncNode);
pSyncNode->pVotesGranted->toLeader = true; pSyncNode->pVotesGranted->toLeader = true;
return ret; return ret;
...@@ -98,7 +98,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { ...@@ -98,7 +98,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
ret = syncNodeRequestVotePeers(pSyncNode); ret = syncNodeRequestVotePeers(pSyncNode);
} }
assert(ret == 0); ASSERT(ret == 0);
syncNodeResetElectTimer(pSyncNode); syncNodeResetElectTimer(pSyncNode);
return ret; return ret;
......
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
*/ */
#include "syncEnv.h" #include "syncEnv.h"
// #include <assert.h> // #include <ASSERT.h>
SSyncEnv *gSyncEnv = NULL; SSyncEnv *gSyncEnv = NULL;
...@@ -40,7 +40,7 @@ int32_t syncEnvStart() { ...@@ -40,7 +40,7 @@ int32_t syncEnvStart() {
taosSeedRand(seed); taosSeedRand(seed);
// gSyncEnv = doSyncEnvStart(gSyncEnv); // gSyncEnv = doSyncEnvStart(gSyncEnv);
gSyncEnv = doSyncEnvStart(); gSyncEnv = doSyncEnvStart();
assert(gSyncEnv != NULL); ASSERT(gSyncEnv != NULL);
sTrace("sync env start ok"); sTrace("sync env start ok");
return ret; return ret;
} }
...@@ -86,7 +86,7 @@ static void syncEnvTick(void *param, void *tmrId) { ...@@ -86,7 +86,7 @@ static void syncEnvTick(void *param, void *tmrId) {
static SSyncEnv *doSyncEnvStart() { static SSyncEnv *doSyncEnvStart() {
SSyncEnv *pSyncEnv = (SSyncEnv *)taosMemoryMalloc(sizeof(SSyncEnv)); SSyncEnv *pSyncEnv = (SSyncEnv *)taosMemoryMalloc(sizeof(SSyncEnv));
assert(pSyncEnv != NULL); ASSERT(pSyncEnv != NULL);
memset(pSyncEnv, 0, sizeof(SSyncEnv)); memset(pSyncEnv, 0, sizeof(SSyncEnv));
pSyncEnv->envTickTimerCounter = 0; pSyncEnv->envTickTimerCounter = 0;
...@@ -103,7 +103,7 @@ static SSyncEnv *doSyncEnvStart() { ...@@ -103,7 +103,7 @@ static SSyncEnv *doSyncEnvStart() {
} }
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) {
assert(pSyncEnv == gSyncEnv); ASSERT(pSyncEnv == gSyncEnv);
if (pSyncEnv != NULL) { if (pSyncEnv != NULL) {
atomic_store_8(&(pSyncEnv->isStart), 0); atomic_store_8(&(pSyncEnv->isStart), 0);
taosTmrCleanUp(pSyncEnv->pTimerManager); taosTmrCleanUp(pSyncEnv->pTimerManager);
......
...@@ -30,7 +30,7 @@ static int32_t syncIODestroy(SSyncIO *io); ...@@ -30,7 +30,7 @@ static int32_t syncIODestroy(SSyncIO *io);
static int32_t syncIOStartInternal(SSyncIO *io); static int32_t syncIOStartInternal(SSyncIO *io);
static int32_t syncIOStopInternal(SSyncIO *io); static int32_t syncIOStopInternal(SSyncIO *io);
static void * syncIOConsumerFunc(void *param); static void *syncIOConsumerFunc(void *param);
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
...@@ -47,11 +47,11 @@ static void syncIOTickPing(void *param, void *tmrId); ...@@ -47,11 +47,11 @@ static void syncIOTickPing(void *param, void *tmrId);
int32_t syncIOStart(char *host, uint16_t port) { int32_t syncIOStart(char *host, uint16_t port) {
int32_t ret = 0; int32_t ret = 0;
gSyncIO = syncIOCreate(host, port); gSyncIO = syncIOCreate(host, port);
assert(gSyncIO != NULL); ASSERT(gSyncIO != NULL);
taosSeedRand(taosGetTimestampSec()); taosSeedRand(taosGetTimestampSec());
ret = syncIOStartInternal(gSyncIO); ret = syncIOStartInternal(gSyncIO);
assert(ret == 0); ASSERT(ret == 0);
sTrace("syncIOStart ok, gSyncIO:%p", gSyncIO); sTrace("syncIOStart ok, gSyncIO:%p", gSyncIO);
return ret; return ret;
...@@ -59,16 +59,16 @@ int32_t syncIOStart(char *host, uint16_t port) { ...@@ -59,16 +59,16 @@ int32_t syncIOStart(char *host, uint16_t port) {
int32_t syncIOStop() { int32_t syncIOStop() {
int32_t ret = syncIOStopInternal(gSyncIO); int32_t ret = syncIOStopInternal(gSyncIO);
assert(ret == 0); ASSERT(ret == 0);
ret = syncIODestroy(gSyncIO); ret = syncIODestroy(gSyncIO);
assert(ret == 0); ASSERT(ret == 0);
return ret; return ret;
} }
int32_t syncIOSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { int32_t syncIOSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
assert(pEpSet->inUse == 0); ASSERT(pEpSet->inUse == 0);
assert(pEpSet->numOfEps == 1); ASSERT(pEpSet->numOfEps == 1);
int32_t ret = 0; int32_t ret = 0;
{ {
...@@ -107,25 +107,25 @@ int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { ...@@ -107,25 +107,25 @@ int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
int32_t syncIOQTimerStart() { int32_t syncIOQTimerStart() {
int32_t ret = syncIOStartQ(gSyncIO); int32_t ret = syncIOStartQ(gSyncIO);
assert(ret == 0); ASSERT(ret == 0);
return ret; return ret;
} }
int32_t syncIOQTimerStop() { int32_t syncIOQTimerStop() {
int32_t ret = syncIOStopQ(gSyncIO); int32_t ret = syncIOStopQ(gSyncIO);
assert(ret == 0); ASSERT(ret == 0);
return ret; return ret;
} }
int32_t syncIOPingTimerStart() { int32_t syncIOPingTimerStart() {
int32_t ret = syncIOStartPing(gSyncIO); int32_t ret = syncIOStartPing(gSyncIO);
assert(ret == 0); ASSERT(ret == 0);
return ret; return ret;
} }
int32_t syncIOPingTimerStop() { int32_t syncIOPingTimerStop() {
int32_t ret = syncIOStopPing(gSyncIO); int32_t ret = syncIOStopPing(gSyncIO);
assert(ret == 0); ASSERT(ret == 0);
return ret; return ret;
} }
...@@ -151,7 +151,7 @@ static SSyncIO *syncIOCreate(char *host, uint16_t port) { ...@@ -151,7 +151,7 @@ static SSyncIO *syncIOCreate(char *host, uint16_t port) {
static int32_t syncIODestroy(SSyncIO *io) { static int32_t syncIODestroy(SSyncIO *io) {
int32_t ret = 0; int32_t ret = 0;
int8_t start = atomic_load_8(&io->isStart); int8_t start = atomic_load_8(&io->isStart);
assert(start == 0); ASSERT(start == 0);
if (io->serverRpc != NULL) { if (io->serverRpc != NULL) {
rpcClose(io->serverRpc); rpcClose(io->serverRpc);
...@@ -242,9 +242,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) { ...@@ -242,9 +242,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
} }
static void *syncIOConsumerFunc(void *param) { static void *syncIOConsumerFunc(void *param) {
SSyncIO * io = param; SSyncIO *io = param;
STaosQall *qall; STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg; SRpcMsg *pRpcMsg, rpcMsg;
qall = taosAllocateQall(); qall = taosAllocateQall();
while (1) { while (1) {
...@@ -264,7 +264,7 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -264,7 +264,7 @@ static void *syncIOConsumerFunc(void *param) {
if (pRpcMsg->msgType == TDMT_SYNC_PING) { if (pRpcMsg->msgType == TDMT_SYNC_PING) {
if (io->FpOnSyncPing != NULL) { if (io->FpOnSyncPing != NULL) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
io->FpOnSyncPing(io->pSyncNode, pSyncMsg); io->FpOnSyncPing(io->pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg); syncPingDestroy(pSyncMsg);
} }
...@@ -272,7 +272,7 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -272,7 +272,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
if (io->FpOnSyncPingReply != NULL) { if (io->FpOnSyncPingReply != NULL) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} }
...@@ -280,7 +280,7 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -280,7 +280,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
if (io->FpOnSyncClientRequest != NULL) { if (io->FpOnSyncClientRequest != NULL) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
io->FpOnSyncClientRequest(io->pSyncNode, pSyncMsg); io->FpOnSyncClientRequest(io->pSyncNode, pSyncMsg);
syncClientRequestDestroy(pSyncMsg); syncClientRequestDestroy(pSyncMsg);
} }
...@@ -288,7 +288,7 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -288,7 +288,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
if (io->FpOnSyncRequestVote != NULL) { if (io->FpOnSyncRequestVote != NULL) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
io->FpOnSyncRequestVote(io->pSyncNode, pSyncMsg); io->FpOnSyncRequestVote(io->pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg); syncRequestVoteDestroy(pSyncMsg);
} }
...@@ -296,7 +296,7 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -296,7 +296,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
if (io->FpOnSyncRequestVoteReply != NULL) { if (io->FpOnSyncRequestVoteReply != NULL) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg); io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg);
} }
...@@ -304,7 +304,7 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -304,7 +304,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
if (io->FpOnSyncAppendEntries != NULL) { if (io->FpOnSyncAppendEntries != NULL) {
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
io->FpOnSyncAppendEntries(io->pSyncNode, pSyncMsg); io->FpOnSyncAppendEntries(io->pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg); syncAppendEntriesDestroy(pSyncMsg);
} }
...@@ -312,7 +312,7 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -312,7 +312,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
if (io->FpOnSyncAppendEntriesReply != NULL) { if (io->FpOnSyncAppendEntriesReply != NULL) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
io->FpOnSyncAppendEntriesReply(io->pSyncNode, pSyncMsg); io->FpOnSyncAppendEntriesReply(io->pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg);
} }
...@@ -320,7 +320,7 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -320,7 +320,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { } else if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
if (io->FpOnSyncTimeout != NULL) { if (io->FpOnSyncTimeout != NULL) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg); io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg); syncTimeoutDestroy(pSyncMsg);
} }
...@@ -328,7 +328,7 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -328,7 +328,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { } else if (pRpcMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
if (io->FpOnSyncSnapshotSend != NULL) { if (io->FpOnSyncSnapshotSend != NULL) {
SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pRpcMsg); SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
io->FpOnSyncSnapshotSend(io->pSyncNode, pSyncMsg); io->FpOnSyncSnapshotSend(io->pSyncNode, pSyncMsg);
syncSnapshotSendDestroy(pSyncMsg); syncSnapshotSendDestroy(pSyncMsg);
} }
...@@ -336,7 +336,7 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -336,7 +336,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { } else if (pRpcMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
if (io->FpOnSyncSnapshotRsp != NULL) { if (io->FpOnSyncSnapshotRsp != NULL) {
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pRpcMsg); SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
io->FpOnSyncSnapshotRsp(io->pSyncNode, pSyncMsg); io->FpOnSyncSnapshotRsp(io->pSyncNode, pSyncMsg);
syncSnapshotRspDestroy(pSyncMsg); syncSnapshotRspDestroy(pSyncMsg);
} }
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) { SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) {
SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr)); SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr));
assert(pSyncIndexMgr != NULL); ASSERT(pSyncIndexMgr != NULL);
memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr)); memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr));
pSyncIndexMgr->replicas = &(pSyncNode->replicasId); pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
...@@ -63,7 +63,7 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, ...@@ -63,7 +63,7 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId,
} }
// maybe config change // maybe config change
// assert(0); // ASSERT(0);
char host[128]; char host[128];
uint16_t port; uint16_t port;
...@@ -169,7 +169,7 @@ void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, S ...@@ -169,7 +169,7 @@ void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, S
} }
// maybe config change // maybe config change
// assert(0); // ASSERT(0);
char host[128]; char host[128];
uint16_t port; uint16_t port;
syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port); syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port);
...@@ -183,5 +183,5 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftI ...@@ -183,5 +183,5 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftI
return term; return term;
} }
} }
assert(0); ASSERT(0);
} }
\ No newline at end of file
此差异已折叠。
此差异已折叠。
...@@ -24,29 +24,29 @@ SRaftCfg *raftCfgOpen(const char *path) { ...@@ -24,29 +24,29 @@ SRaftCfg *raftCfgOpen(const char *path) {
snprintf(pCfg->path, sizeof(pCfg->path), "%s", path); snprintf(pCfg->path, sizeof(pCfg->path), "%s", path);
pCfg->pFile = taosOpenFile(pCfg->path, TD_FILE_READ | TD_FILE_WRITE); pCfg->pFile = taosOpenFile(pCfg->path, TD_FILE_READ | TD_FILE_WRITE);
assert(pCfg->pFile != NULL); ASSERT(pCfg->pFile != NULL);
taosLSeekFile(pCfg->pFile, 0, SEEK_SET); taosLSeekFile(pCfg->pFile, 0, SEEK_SET);
char buf[1024] = {0}; char buf[1024] = {0};
int len = taosReadFile(pCfg->pFile, buf, sizeof(buf)); int len = taosReadFile(pCfg->pFile, buf, sizeof(buf));
assert(len > 0); ASSERT(len > 0);
int32_t ret = raftCfgFromStr(buf, pCfg); int32_t ret = raftCfgFromStr(buf, pCfg);
assert(ret == 0); ASSERT(ret == 0);
return pCfg; return pCfg;
} }
int32_t raftCfgClose(SRaftCfg *pRaftCfg) { int32_t raftCfgClose(SRaftCfg *pRaftCfg) {
int64_t ret = taosCloseFile(&(pRaftCfg->pFile)); int64_t ret = taosCloseFile(&(pRaftCfg->pFile));
assert(ret == 0); ASSERT(ret == 0);
taosMemoryFree(pRaftCfg); taosMemoryFree(pRaftCfg);
return 0; return 0;
} }
int32_t raftCfgPersist(SRaftCfg *pRaftCfg) { int32_t raftCfgPersist(SRaftCfg *pRaftCfg) {
assert(pRaftCfg != NULL); ASSERT(pRaftCfg != NULL);
char *s = raftCfg2Str(pRaftCfg); char *s = raftCfg2Str(pRaftCfg);
taosLSeekFile(pRaftCfg->pFile, 0, SEEK_SET); taosLSeekFile(pRaftCfg->pFile, 0, SEEK_SET);
...@@ -61,10 +61,10 @@ int32_t raftCfgPersist(SRaftCfg *pRaftCfg) { ...@@ -61,10 +61,10 @@ int32_t raftCfgPersist(SRaftCfg *pRaftCfg) {
snprintf(buf, sizeof(buf), "%s", s); snprintf(buf, sizeof(buf), "%s", s);
int64_t ret = taosWriteFile(pRaftCfg->pFile, buf, sizeof(buf)); int64_t ret = taosWriteFile(pRaftCfg->pFile, buf, sizeof(buf));
assert(ret == sizeof(buf)); ASSERT(ret == sizeof(buf));
// int64_t ret = taosWriteFile(pRaftCfg->pFile, s, strlen(s) + 1); // int64_t ret = taosWriteFile(pRaftCfg->pFile, s, strlen(s) + 1);
// assert(ret == strlen(s) + 1); // ASSERT(ret == strlen(s) + 1);
taosMemoryFree(s); taosMemoryFree(s);
taosFsyncFile(pRaftCfg->pFile); taosFsyncFile(pRaftCfg->pFile);
...@@ -135,27 +135,27 @@ int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) { ...@@ -135,27 +135,27 @@ int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) {
const cJSON *pJson = pRoot; const cJSON *pJson = pRoot;
cJSON *pReplicaNum = cJSON_GetObjectItem(pJson, "replicaNum"); cJSON *pReplicaNum = cJSON_GetObjectItem(pJson, "replicaNum");
assert(cJSON_IsNumber(pReplicaNum)); ASSERT(cJSON_IsNumber(pReplicaNum));
pSyncCfg->replicaNum = cJSON_GetNumberValue(pReplicaNum); pSyncCfg->replicaNum = cJSON_GetNumberValue(pReplicaNum);
cJSON *pMyIndex = cJSON_GetObjectItem(pJson, "myIndex"); cJSON *pMyIndex = cJSON_GetObjectItem(pJson, "myIndex");
assert(cJSON_IsNumber(pMyIndex)); ASSERT(cJSON_IsNumber(pMyIndex));
pSyncCfg->myIndex = cJSON_GetNumberValue(pMyIndex); pSyncCfg->myIndex = cJSON_GetNumberValue(pMyIndex);
cJSON *pNodeInfoArr = cJSON_GetObjectItem(pJson, "nodeInfo"); cJSON *pNodeInfoArr = cJSON_GetObjectItem(pJson, "nodeInfo");
int arraySize = cJSON_GetArraySize(pNodeInfoArr); int arraySize = cJSON_GetArraySize(pNodeInfoArr);
assert(arraySize == pSyncCfg->replicaNum); ASSERT(arraySize == pSyncCfg->replicaNum);
for (int i = 0; i < arraySize; ++i) { for (int i = 0; i < arraySize; ++i) {
cJSON *pNodeInfo = cJSON_GetArrayItem(pNodeInfoArr, i); cJSON *pNodeInfo = cJSON_GetArrayItem(pNodeInfoArr, i);
assert(pNodeInfo != NULL); ASSERT(pNodeInfo != NULL);
cJSON *pNodePort = cJSON_GetObjectItem(pNodeInfo, "nodePort"); cJSON *pNodePort = cJSON_GetObjectItem(pNodeInfo, "nodePort");
assert(cJSON_IsNumber(pNodePort)); ASSERT(cJSON_IsNumber(pNodePort));
((pSyncCfg->nodeInfo)[i]).nodePort = cJSON_GetNumberValue(pNodePort); ((pSyncCfg->nodeInfo)[i]).nodePort = cJSON_GetNumberValue(pNodePort);
cJSON *pNodeFqdn = cJSON_GetObjectItem(pNodeInfo, "nodeFqdn"); cJSON *pNodeFqdn = cJSON_GetObjectItem(pNodeInfo, "nodeFqdn");
assert(cJSON_IsString(pNodeFqdn)); ASSERT(cJSON_IsString(pNodeFqdn));
snprintf(((pSyncCfg->nodeInfo)[i]).nodeFqdn, sizeof(((pSyncCfg->nodeInfo)[i]).nodeFqdn), "%s", snprintf(((pSyncCfg->nodeInfo)[i]).nodeFqdn, sizeof(((pSyncCfg->nodeInfo)[i]).nodeFqdn), "%s",
pNodeFqdn->valuestring); pNodeFqdn->valuestring);
} }
...@@ -165,10 +165,10 @@ int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) { ...@@ -165,10 +165,10 @@ int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) {
int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg) { int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg) {
cJSON *pRoot = cJSON_Parse(s); cJSON *pRoot = cJSON_Parse(s);
assert(pRoot != NULL); ASSERT(pRoot != NULL);
int32_t ret = syncCfgFromJson(pRoot, pSyncCfg); int32_t ret = syncCfgFromJson(pRoot, pSyncCfg);
assert(ret == 0); ASSERT(ret == 0);
cJSON_Delete(pRoot); cJSON_Delete(pRoot);
return 0; return 0;
...@@ -207,10 +207,10 @@ char *raftCfg2Str(SRaftCfg *pRaftCfg) { ...@@ -207,10 +207,10 @@ char *raftCfg2Str(SRaftCfg *pRaftCfg) {
} }
int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) { int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
assert(pCfg != NULL); ASSERT(pCfg != NULL);
TdFilePtr pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE); TdFilePtr pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE);
assert(pFile != NULL); ASSERT(pFile != NULL);
SRaftCfg raftCfg; SRaftCfg raftCfg;
raftCfg.cfg = *pCfg; raftCfg.cfg = *pCfg;
...@@ -227,10 +227,10 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) { ...@@ -227,10 +227,10 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
ASSERT(strlen(s) + 1 <= CONFIG_FILE_LEN); ASSERT(strlen(s) + 1 <= CONFIG_FILE_LEN);
snprintf(buf, sizeof(buf), "%s", s); snprintf(buf, sizeof(buf), "%s", s);
int64_t ret = taosWriteFile(pFile, buf, sizeof(buf)); int64_t ret = taosWriteFile(pFile, buf, sizeof(buf));
assert(ret == sizeof(buf)); ASSERT(ret == sizeof(buf));
// int64_t ret = taosWriteFile(pFile, s, strlen(s) + 1); // int64_t ret = taosWriteFile(pFile, s, strlen(s) + 1);
// assert(ret == strlen(s) + 1); // ASSERT(ret == strlen(s) + 1);
taosMemoryFree(s); taosMemoryFree(s);
taosCloseFile(&pFile); taosCloseFile(&pFile);
...@@ -255,15 +255,15 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { ...@@ -255,15 +255,15 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
cJSON *pIndexArr = cJSON_GetObjectItem(pJson, "configIndexArr"); cJSON *pIndexArr = cJSON_GetObjectItem(pJson, "configIndexArr");
int arraySize = cJSON_GetArraySize(pIndexArr); int arraySize = cJSON_GetArraySize(pIndexArr);
assert(arraySize == pRaftCfg->configIndexCount); ASSERT(arraySize == pRaftCfg->configIndexCount);
memset(pRaftCfg->configIndexArr, 0, sizeof(pRaftCfg->configIndexArr)); memset(pRaftCfg->configIndexArr, 0, sizeof(pRaftCfg->configIndexArr));
for (int i = 0; i < arraySize; ++i) { for (int i = 0; i < arraySize; ++i) {
cJSON *pIndexObj = cJSON_GetArrayItem(pIndexArr, i); cJSON *pIndexObj = cJSON_GetArrayItem(pIndexArr, i);
assert(pIndexObj != NULL); ASSERT(pIndexObj != NULL);
cJSON *pIndex = cJSON_GetObjectItem(pIndexObj, "index"); cJSON *pIndex = cJSON_GetObjectItem(pIndexObj, "index");
assert(cJSON_IsString(pIndex)); ASSERT(cJSON_IsString(pIndex));
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
} }
...@@ -276,10 +276,10 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { ...@@ -276,10 +276,10 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg) { int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg) {
cJSON *pRoot = cJSON_Parse(s); cJSON *pRoot = cJSON_Parse(s);
assert(pRoot != NULL); ASSERT(pRoot != NULL);
int32_t ret = raftCfgFromJson(pRoot, pRaftCfg); int32_t ret = raftCfgFromJson(pRoot, pRaftCfg);
assert(ret == 0); ASSERT(ret == 0);
cJSON_Delete(pRoot); cJSON_Delete(pRoot);
return 0; return 0;
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) { SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) {
uint32_t bytes = sizeof(SSyncRaftEntry) + dataLen; uint32_t bytes = sizeof(SSyncRaftEntry) + dataLen;
SSyncRaftEntry* pEntry = taosMemoryMalloc(bytes); SSyncRaftEntry* pEntry = taosMemoryMalloc(bytes);
assert(pEntry != NULL); ASSERT(pEntry != NULL);
memset(pEntry, 0, bytes); memset(pEntry, 0, bytes);
pEntry->bytes = bytes; pEntry->bytes = bytes;
pEntry->dataLen = dataLen; pEntry->dataLen = dataLen;
...@@ -29,14 +29,14 @@ SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) { ...@@ -29,14 +29,14 @@ SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) {
// step 4. SyncClientRequest => SSyncRaftEntry, add term, index // step 4. SyncClientRequest => SSyncRaftEntry, add term, index
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) { SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {
SSyncRaftEntry* pEntry = syncEntryBuild3(pMsg, term, index); SSyncRaftEntry* pEntry = syncEntryBuild3(pMsg, term, index);
assert(pEntry != NULL); ASSERT(pEntry != NULL);
return pEntry; return pEntry;
} }
SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) { SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {
SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen); SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen);
assert(pEntry != NULL); ASSERT(pEntry != NULL);
pEntry->msgType = pMsg->msgType; pEntry->msgType = pMsg->msgType;
pEntry->originalRpcType = pMsg->originalRpcType; pEntry->originalRpcType = pMsg->originalRpcType;
...@@ -63,7 +63,7 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) ...@@ -63,7 +63,7 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId)
memcpy(rpcMsg.pCont, &head, sizeof(head)); memcpy(rpcMsg.pCont, &head, sizeof(head));
SSyncRaftEntry* pEntry = syncEntryBuild(rpcMsg.contLen); SSyncRaftEntry* pEntry = syncEntryBuild(rpcMsg.contLen);
assert(pEntry != NULL); ASSERT(pEntry != NULL);
pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST; pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
pEntry->originalRpcType = TDMT_SYNC_NOOP; pEntry->originalRpcType = TDMT_SYNC_NOOP;
...@@ -72,7 +72,7 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) ...@@ -72,7 +72,7 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId)
pEntry->term = term; pEntry->term = term;
pEntry->index = index; pEntry->index = index;
assert(pEntry->dataLen == rpcMsg.contLen); ASSERT(pEntry->dataLen == rpcMsg.contLen);
memcpy(pEntry->data, rpcMsg.pCont, rpcMsg.contLen); memcpy(pEntry->data, rpcMsg.pCont, rpcMsg.contLen);
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
...@@ -88,7 +88,7 @@ void syncEntryDestory(SSyncRaftEntry* pEntry) { ...@@ -88,7 +88,7 @@ void syncEntryDestory(SSyncRaftEntry* pEntry) {
// step 5. SSyncRaftEntry => bin, to raft log // step 5. SSyncRaftEntry => bin, to raft log
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) { char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) {
char* buf = taosMemoryMalloc(pEntry->bytes); char* buf = taosMemoryMalloc(pEntry->bytes);
assert(buf != NULL); ASSERT(buf != NULL);
memcpy(buf, pEntry, pEntry->bytes); memcpy(buf, pEntry, pEntry->bytes);
if (len != NULL) { if (len != NULL) {
*len = pEntry->bytes; *len = pEntry->bytes;
...@@ -100,9 +100,9 @@ char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) { ...@@ -100,9 +100,9 @@ char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) {
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) { SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf); uint32_t bytes = *((uint32_t*)buf);
SSyncRaftEntry* pEntry = taosMemoryMalloc(bytes); SSyncRaftEntry* pEntry = taosMemoryMalloc(bytes);
assert(pEntry != NULL); ASSERT(pEntry != NULL);
memcpy(pEntry, buf, len); memcpy(pEntry, buf, len);
assert(len == pEntry->bytes); ASSERT(len == pEntry->bytes);
return pEntry; return pEntry;
} }
......
...@@ -25,7 +25,7 @@ static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore); ...@@ -25,7 +25,7 @@ static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore);
static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore); static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore);
static bool raftLogIsEmpty(struct SSyncLogStore* pLogStore); static bool raftLogIsEmpty(struct SSyncLogStore* pLogStore);
static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore); static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore);
static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index);
static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore); static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore);
static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore); static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore);
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
...@@ -58,8 +58,6 @@ static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex b ...@@ -58,8 +58,6 @@ static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex b
return 0; return 0;
} }
int32_t raftLogResetBeginIndex(struct SSyncLogStore* pLogStore) { return 0; }
static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) { static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
...@@ -81,6 +79,7 @@ static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) { ...@@ -81,6 +79,7 @@ static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) {
return count > 0 ? count : 0; return count > 0 ? count : 0;
} }
#if 0
static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index) { static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index) {
SyncIndex beginIndex = raftLogBeginIndex(pLogStore); SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
SyncIndex endIndex = raftLogEndIndex(pLogStore); SyncIndex endIndex = raftLogEndIndex(pLogStore);
...@@ -90,6 +89,7 @@ static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -90,6 +89,7 @@ static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index) {
return false; return false;
} }
} }
#endif
static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) { static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) {
SyncIndex lastIndex; SyncIndex lastIndex;
...@@ -143,7 +143,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -143,7 +143,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
SyncIndex writeIndex = raftLogWriteIndex(pLogStore); SyncIndex writeIndex = raftLogWriteIndex(pLogStore);
ASSERT(pEntry->index == writeIndex); if (pEntry->index != writeIndex) {
sError("raftLogAppendEntry error, pEntry->index:%ld update to writeIndex:%ld", pEntry->index, writeIndex);
pEntry->index = writeIndex;
}
int code = 0; int code = 0;
SSyncLogMeta syncMeta; SSyncLogMeta syncMeta;
...@@ -171,6 +174,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -171,6 +174,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
return code; return code;
} }
#if 0
static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) { static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
...@@ -215,6 +219,49 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, ...@@ -215,6 +219,49 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
return code; return code;
} }
#endif
static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
int32_t code;
*ppEntry = NULL;
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
if (pWalHandle == NULL) {
return -1;
}
code = walReadWithHandle(pWalHandle, index);
if (code != 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t linuxErr = errno;
const char* linuxErrMsg = strerror(errno);
sError("raftLogGetEntry error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
linuxErrMsg);
walCloseReadHandle(pWalHandle);
return code;
}
*ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
ASSERT(*ppEntry != NULL);
(*ppEntry)->msgType = TDMT_SYNC_CLIENT_REQUEST;
(*ppEntry)->originalRpcType = pWalHandle->pHead->head.msgType;
(*ppEntry)->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
(*ppEntry)->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
(*ppEntry)->term = pWalHandle->pHead->head.syncMeta.term;
(*ppEntry)->index = index;
ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen);
memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
// need to hold, do not new every time!!
walCloseReadHandle(pWalHandle);
return code;
}
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) { static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
...@@ -245,10 +292,10 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp ...@@ -245,10 +292,10 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
//------------------------------- //-------------------------------
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore)); SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore));
assert(pLogStore != NULL); ASSERT(pLogStore != NULL);
pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData)); pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
assert(pLogStore->data != NULL); ASSERT(pLogStore->data != NULL);
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
pData->pSyncNode = pSyncNode; pData->pSyncNode = pSyncNode;
...@@ -277,7 +324,6 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { ...@@ -277,7 +324,6 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pLogStore->syncLogEndIndex = raftLogEndIndex; pLogStore->syncLogEndIndex = raftLogEndIndex;
pLogStore->syncLogIsEmpty = raftLogIsEmpty; pLogStore->syncLogIsEmpty = raftLogIsEmpty;
pLogStore->syncLogEntryCount = raftLogEntryCount; pLogStore->syncLogEntryCount = raftLogEntryCount;
pLogStore->syncLogInRange = raftLogInRange;
pLogStore->syncLogLastIndex = raftLogLastIndex; pLogStore->syncLogLastIndex = raftLogLastIndex;
pLogStore->syncLogLastTerm = raftLogLastTerm; pLogStore->syncLogLastTerm = raftLogLastTerm;
pLogStore->syncLogAppendEntry = raftLogAppendEntry; pLogStore->syncLogAppendEntry = raftLogAppendEntry;
...@@ -285,6 +331,8 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { ...@@ -285,6 +331,8 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pLogStore->syncLogTruncate = raftLogTruncate; pLogStore->syncLogTruncate = raftLogTruncate;
pLogStore->syncLogWriteIndex = raftLogWriteIndex; pLogStore->syncLogWriteIndex = raftLogWriteIndex;
// pLogStore->syncLogInRange = raftLogInRange;
return pLogStore; return pLogStore;
} }
...@@ -301,7 +349,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { ...@@ -301,7 +349,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
SyncIndex lastIndex = logStoreLastIndex(pLogStore); SyncIndex lastIndex = logStoreLastIndex(pLogStore);
assert(pEntry->index == lastIndex + 1); ASSERT(pEntry->index == lastIndex + 1);
int code = 0; int code = 0;
SSyncLogMeta syncMeta; SSyncLogMeta syncMeta;
...@@ -347,10 +395,10 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -347,10 +395,10 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
linuxErrMsg); linuxErrMsg);
ASSERT(0); ASSERT(0);
} }
// assert(walReadWithHandle(pWalHandle, index) == 0); // ASSERT(walReadWithHandle(pWalHandle, index) == 0);
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
assert(pEntry != NULL); ASSERT(pEntry != NULL);
pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST; pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
pEntry->originalRpcType = pWalHandle->pHead->head.msgType; pEntry->originalRpcType = pWalHandle->pHead->head.msgType;
...@@ -358,7 +406,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -358,7 +406,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek; pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
pEntry->term = pWalHandle->pHead->head.syncMeta.term; pEntry->term = pWalHandle->pHead->head.syncMeta.term;
pEntry->index = index; pEntry->index = index;
assert(pEntry->dataLen == pWalHandle->pHead->head.bodyLen); ASSERT(pEntry->dataLen == pWalHandle->pHead->head.bodyLen);
memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen); memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
// need to hold, do not new every time!! // need to hold, do not new every time!!
...@@ -373,7 +421,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { ...@@ -373,7 +421,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
// assert(walRollback(pWal, fromIndex) == 0); // ASSERT(walRollback(pWal, fromIndex) == 0);
int32_t code = walRollback(pWal, fromIndex); int32_t code = walRollback(pWal, fromIndex);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
...@@ -407,7 +455,7 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { ...@@ -407,7 +455,7 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
// assert(walCommit(pWal, index) == 0); // ASSERT(walCommit(pWal, index) == 0);
int32_t code = walCommit(pWal, index); int32_t code = walCommit(pWal, index);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
......
...@@ -39,40 +39,40 @@ SRaftStore *raftStoreOpen(const char *path) { ...@@ -39,40 +39,40 @@ SRaftStore *raftStoreOpen(const char *path) {
if (!raftStoreFileExist(pRaftStore->path)) { if (!raftStoreFileExist(pRaftStore->path)) {
ret = raftStoreInit(pRaftStore); ret = raftStoreInit(pRaftStore);
assert(ret == 0); ASSERT(ret == 0);
} }
pRaftStore->pFile = taosOpenFile(path, TD_FILE_READ | TD_FILE_WRITE); pRaftStore->pFile = taosOpenFile(path, TD_FILE_READ | TD_FILE_WRITE);
assert(pRaftStore->pFile != NULL); ASSERT(pRaftStore->pFile != NULL);
int len = taosReadFile(pRaftStore->pFile, storeBuf, RAFT_STORE_BLOCK_SIZE); int len = taosReadFile(pRaftStore->pFile, storeBuf, RAFT_STORE_BLOCK_SIZE);
assert(len == RAFT_STORE_BLOCK_SIZE); ASSERT(len == RAFT_STORE_BLOCK_SIZE);
ret = raftStoreDeserialize(pRaftStore, storeBuf, len); ret = raftStoreDeserialize(pRaftStore, storeBuf, len);
assert(ret == 0); ASSERT(ret == 0);
return pRaftStore; return pRaftStore;
} }
static int32_t raftStoreInit(SRaftStore *pRaftStore) { static int32_t raftStoreInit(SRaftStore *pRaftStore) {
assert(pRaftStore != NULL); ASSERT(pRaftStore != NULL);
pRaftStore->pFile = taosOpenFile(pRaftStore->path, TD_FILE_CREATE | TD_FILE_WRITE); pRaftStore->pFile = taosOpenFile(pRaftStore->path, TD_FILE_CREATE | TD_FILE_WRITE);
assert(pRaftStore->pFile != NULL); ASSERT(pRaftStore->pFile != NULL);
pRaftStore->currentTerm = 0; pRaftStore->currentTerm = 0;
pRaftStore->voteFor.addr = 0; pRaftStore->voteFor.addr = 0;
pRaftStore->voteFor.vgId = 0; pRaftStore->voteFor.vgId = 0;
int32_t ret = raftStorePersist(pRaftStore); int32_t ret = raftStorePersist(pRaftStore);
assert(ret == 0); ASSERT(ret == 0);
taosCloseFile(&pRaftStore->pFile); taosCloseFile(&pRaftStore->pFile);
return 0; return 0;
} }
int32_t raftStoreClose(SRaftStore *pRaftStore) { int32_t raftStoreClose(SRaftStore *pRaftStore) {
assert(pRaftStore != NULL); ASSERT(pRaftStore != NULL);
taosCloseFile(&pRaftStore->pFile); taosCloseFile(&pRaftStore->pFile);
taosMemoryFree(pRaftStore); taosMemoryFree(pRaftStore);
...@@ -81,17 +81,17 @@ int32_t raftStoreClose(SRaftStore *pRaftStore) { ...@@ -81,17 +81,17 @@ int32_t raftStoreClose(SRaftStore *pRaftStore) {
} }
int32_t raftStorePersist(SRaftStore *pRaftStore) { int32_t raftStorePersist(SRaftStore *pRaftStore) {
assert(pRaftStore != NULL); ASSERT(pRaftStore != NULL);
int32_t ret; int32_t ret;
char storeBuf[RAFT_STORE_BLOCK_SIZE] = {0}; char storeBuf[RAFT_STORE_BLOCK_SIZE] = {0};
ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf));
assert(ret == 0); ASSERT(ret == 0);
taosLSeekFile(pRaftStore->pFile, 0, SEEK_SET); taosLSeekFile(pRaftStore->pFile, 0, SEEK_SET);
ret = taosWriteFile(pRaftStore->pFile, storeBuf, sizeof(storeBuf)); ret = taosWriteFile(pRaftStore->pFile, storeBuf, sizeof(storeBuf));
assert(ret == RAFT_STORE_BLOCK_SIZE); ASSERT(ret == RAFT_STORE_BLOCK_SIZE);
taosFsyncFile(pRaftStore->pFile); taosFsyncFile(pRaftStore->pFile);
return 0; return 0;
...@@ -103,7 +103,7 @@ static bool raftStoreFileExist(char *path) { ...@@ -103,7 +103,7 @@ static bool raftStoreFileExist(char *path) {
} }
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
assert(pRaftStore != NULL); ASSERT(pRaftStore != NULL);
cJSON *pRoot = cJSON_CreateObject(); cJSON *pRoot = cJSON_CreateObject();
...@@ -125,7 +125,7 @@ int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { ...@@ -125,7 +125,7 @@ int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
char *serialized = cJSON_Print(pRoot); char *serialized = cJSON_Print(pRoot);
int len2 = strlen(serialized); int len2 = strlen(serialized);
assert(len2 < len); ASSERT(len2 < len);
memset(buf, 0, len); memset(buf, 0, len);
snprintf(buf, len, "%s", serialized); snprintf(buf, len, "%s", serialized);
taosMemoryFree(serialized); taosMemoryFree(serialized);
...@@ -135,17 +135,17 @@ int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { ...@@ -135,17 +135,17 @@ int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
} }
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
assert(pRaftStore != NULL); ASSERT(pRaftStore != NULL);
assert(len > 0 && len <= RAFT_STORE_BLOCK_SIZE); ASSERT(len > 0 && len <= RAFT_STORE_BLOCK_SIZE);
cJSON *pRoot = cJSON_Parse(buf); cJSON *pRoot = cJSON_Parse(buf);
cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term"); cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term");
assert(cJSON_IsString(pCurrentTerm)); ASSERT(cJSON_IsString(pCurrentTerm));
sscanf(pCurrentTerm->valuestring, "%lu", &(pRaftStore->currentTerm)); sscanf(pCurrentTerm->valuestring, "%lu", &(pRaftStore->currentTerm));
cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr"); cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr");
assert(cJSON_IsString(pVoteForAddr)); ASSERT(cJSON_IsString(pVoteForAddr));
sscanf(pVoteForAddr->valuestring, "%lu", &(pRaftStore->voteFor.addr)); sscanf(pVoteForAddr->valuestring, "%lu", &(pRaftStore->voteFor.addr));
cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid"); cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid");
...@@ -161,7 +161,7 @@ bool raftStoreHasVoted(SRaftStore *pRaftStore) { ...@@ -161,7 +161,7 @@ bool raftStoreHasVoted(SRaftStore *pRaftStore) {
} }
void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId) { void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId) {
assert(!syncUtilEmptyId(pRaftId)); ASSERT(!syncUtilEmptyId(pRaftId));
pRaftStore->voteFor = *pRaftId; pRaftStore->voteFor = *pRaftId;
raftStorePersist(pRaftStore); raftStorePersist(pRaftStore);
} }
...@@ -216,7 +216,7 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) { ...@@ -216,7 +216,7 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) {
char *raftStore2Str(SRaftStore *pRaftStore) { char *raftStore2Str(SRaftStore *pRaftStore) {
cJSON *pJson = raftStore2Json(pRaftStore); cJSON *pJson = raftStore2Json(pRaftStore);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
......
...@@ -49,7 +49,7 @@ ...@@ -49,7 +49,7 @@
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// //
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER); ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pNextIndex", pSyncNode->pNextIndex); syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pNextIndex", pSyncNode->pNextIndex);
syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pMatchIndex", pSyncNode->pMatchIndex); syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pMatchIndex", pSyncNode->pMatchIndex);
...@@ -68,7 +68,7 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { ...@@ -68,7 +68,7 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
SyncTerm preLogTerm = 0; SyncTerm preLogTerm = 0;
if (preLogIndex >= SYNC_INDEX_BEGIN) { if (preLogIndex >= SYNC_INDEX_BEGIN) {
SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preLogIndex); SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preLogIndex);
assert(pPreEntry != NULL); ASSERT(pPreEntry != NULL);
preLogTerm = pPreEntry->term; preLogTerm = pPreEntry->term;
syncEntryDestory(pPreEntry); syncEntryDestory(pPreEntry);
...@@ -81,12 +81,12 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { ...@@ -81,12 +81,12 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, nextIndex); SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, nextIndex);
if (pEntry != NULL) { if (pEntry != NULL) {
pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId); pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId);
assert(pMsg != NULL); ASSERT(pMsg != NULL);
// add pEntry into msg // add pEntry into msg
uint32_t len; uint32_t len;
char* serialized = syncEntrySerialize(pEntry, &len); char* serialized = syncEntrySerialize(pEntry, &len);
assert(len == pEntry->bytes); ASSERT(len == pEntry->bytes);
memcpy(pMsg->data, serialized, len); memcpy(pMsg->data, serialized, len);
taosMemoryFree(serialized); taosMemoryFree(serialized);
...@@ -95,10 +95,10 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { ...@@ -95,10 +95,10 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
} else { } else {
// maybe overflow, send empty record // maybe overflow, send empty record
pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId);
assert(pMsg != NULL); ASSERT(pMsg != NULL);
} }
assert(pMsg != NULL); ASSERT(pMsg != NULL);
pMsg->srcId = pSyncNode->myRaftId; pMsg->srcId = pSyncNode->myRaftId;
pMsg->destId = *pDestId; pMsg->destId = *pDestId;
pMsg->term = pSyncNode->pRaftStore->currentTerm; pMsg->term = pSyncNode->pRaftStore->currentTerm;
...@@ -148,25 +148,32 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { ...@@ -148,25 +148,32 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
SSyncRaftEntry* pEntry; SSyncRaftEntry* pEntry;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry); int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry);
ASSERT(code == 0);
if (pEntry != NULL) { if (code == 0) {
ASSERT(pEntry != NULL);
pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId); pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId);
ASSERT(pMsg != NULL); ASSERT(pMsg != NULL);
// add pEntry into msg // add pEntry into msg
uint32_t len; uint32_t len;
char* serialized = syncEntrySerialize(pEntry, &len); char* serialized = syncEntrySerialize(pEntry, &len);
assert(len == pEntry->bytes); ASSERT(len == pEntry->bytes);
memcpy(pMsg->data, serialized, len); memcpy(pMsg->data, serialized, len);
taosMemoryFree(serialized); taosMemoryFree(serialized);
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
} else { } else {
// no entry in log if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); // no entry in log
ASSERT(pMsg != NULL); pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId);
ASSERT(pMsg != NULL);
} else {
syncNodeLog3("", pSyncNode);
ASSERT(0);
}
} }
// prepare msg // prepare msg
......
...@@ -52,7 +52,7 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { ...@@ -52,7 +52,7 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->pRaftStore->currentTerm) {
syncNodeUpdateTerm(ths, pMsg->term); syncNodeUpdateTerm(ths, pMsg->term);
} }
assert(pMsg->term <= ths->pRaftStore->currentTerm); ASSERT(pMsg->term <= ths->pRaftStore->currentTerm);
bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) || bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) ||
((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) && ((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) &&
......
...@@ -50,7 +50,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) ...@@ -50,7 +50,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
return ret; return ret;
} }
// assert(!(pMsg->term > ths->pRaftStore->currentTerm)); // ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm));
// no need this code, because if I receive reply.term, then I must have sent for that term. // no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) { // if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term); // syncNodeUpdateTerm(ths, pMsg->term);
...@@ -65,7 +65,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) ...@@ -65,7 +65,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
return ret; return ret;
} }
assert(pMsg->term == ths->pRaftStore->currentTerm); ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
// This tallies votes even when the current state is not Candidate, // This tallies votes even when the current state is not Candidate,
// but they won't be looked at, so it doesn't matter. // but they won't be looked at, so it doesn't matter.
...@@ -115,7 +115,7 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl ...@@ -115,7 +115,7 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
return ret; return ret;
} }
// assert(!(pMsg->term > ths->pRaftStore->currentTerm)); // ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm));
// no need this code, because if I receive reply.term, then I must have sent for that term. // no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) { // if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term); // syncNodeUpdateTerm(ths, pMsg->term);
......
...@@ -22,7 +22,7 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) { ...@@ -22,7 +22,7 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
pObj->pRespHash = pObj->pRespHash =
taosHashInit(sizeof(uint64_t), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); taosHashInit(sizeof(uint64_t), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
assert(pObj->pRespHash != NULL); ASSERT(pObj->pRespHash != NULL);
pObj->ttl = ttl; pObj->ttl = ttl;
pObj->data = data; pObj->data = data;
pObj->seqNum = 0; pObj->seqNum = 0;
......
...@@ -73,41 +73,49 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void ...@@ -73,41 +73,49 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void
pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN;
pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID; pSender->ack = SYNC_SNAPSHOT_SEQ_INVALID;
// open snapshot reader // init snapshot and reader
ASSERT(pSender->pReader == NULL); ASSERT(pSender->pReader == NULL);
pSender->pReader = pReader; pSender->pReader = pReader;
pSender->snapshot = snapshot; pSender->snapshot = snapshot;
/*
// open snapshot reader
ASSERT(pSender->pReader == NULL);
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader));
ASSERT(ret == 0);
// get current snapshot info
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
*/
if (pSender->pCurrentBlock != NULL) { if (pSender->pCurrentBlock != NULL) {
taosMemoryFree(pSender->pCurrentBlock); taosMemoryFree(pSender->pCurrentBlock);
} }
pSender->blockLen = 0; pSender->blockLen = 0;
if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) { if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) {
SSyncRaftEntry *pEntry = int32_t code = 0;
pSender->pSyncNode->pLogStore->getEntry(pSender->pSyncNode->pLogStore, pSender->snapshot.lastConfigIndex); SSyncRaftEntry *pEntry = NULL;
ASSERT(pEntry != NULL); code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
pSender->snapshot.lastConfigIndex, &pEntry);
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg); bool getLastConfig = false;
SSyncCfg lastConfig; if (code == 0) {
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig); ASSERT(pEntry != NULL);
ASSERT(ret == 0);
pSender->lastConfig = lastConfig; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg);
SSyncCfg lastConfig;
int32_t ret = syncCfgFromStr(rpcMsg.pCont, &lastConfig);
ASSERT(ret == 0);
pSender->lastConfig = lastConfig;
getLastConfig = true;
rpcFreeCont(rpcMsg.pCont);
syncEntryDestory(pEntry);
} else {
if (pSender->snapshot.lastConfigIndex == pSender->pSyncNode->pRaftCfg->lastConfigIndex) {
sTrace("vgId:%d sync sender get cfg from local", pSender->pSyncNode->vgId);
pSender->lastConfig = pSender->pSyncNode->pRaftCfg->cfg;
getLastConfig = true;
}
}
rpcFreeCont(rpcMsg.pCont); if (!getLastConfig) {
syncEntryDestory(pEntry); syncNodeLog3("", pSender->pSyncNode);
ASSERT(0);
}
} else { } else {
memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg)); memset(&(pSender->lastConfig), 0, sizeof(SSyncCfg));
......
...@@ -86,7 +86,7 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) { ...@@ -86,7 +86,7 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {
void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) { void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) {
uint32_t ipv4 = taosGetIpv4FromFqdn(pNodeInfo->nodeFqdn); uint32_t ipv4 = taosGetIpv4FromFqdn(pNodeInfo->nodeFqdn);
assert(ipv4 != 0xFFFFFFFF); ASSERT(ipv4 != 0xFFFFFFFF);
char ipbuf[128] = {0}; char ipbuf[128] = {0};
tinet_ntoa(ipbuf, ipv4); tinet_ntoa(ipbuf, ipv4);
raftId->addr = syncUtilAddr2U64(ipbuf, pNodeInfo->nodePort); raftId->addr = syncUtilAddr2U64(ipbuf, pNodeInfo->nodePort);
...@@ -124,7 +124,7 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) { ...@@ -124,7 +124,7 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) {
int32_t syncUtilRand(int32_t max) { return taosRand() % max; } int32_t syncUtilRand(int32_t max) { return taosRand() % max; }
int32_t syncUtilElectRandomMS(int32_t min, int32_t max) { int32_t syncUtilElectRandomMS(int32_t min, int32_t max) {
assert(min > 0 && max > 0 && max >= min); ASSERT(min > 0 && max > 0 && max >= min);
return min + syncUtilRand(max - min); return min + syncUtilRand(max - min);
} }
...@@ -201,7 +201,7 @@ bool syncUtilCanPrint(char c) { ...@@ -201,7 +201,7 @@ bool syncUtilCanPrint(char c) {
char* syncUtilprintBin(char* ptr, uint32_t len) { char* syncUtilprintBin(char* ptr, uint32_t len) {
char* s = taosMemoryMalloc(len + 1); char* s = taosMemoryMalloc(len + 1);
assert(s != NULL); ASSERT(s != NULL);
memset(s, 0, len + 1); memset(s, 0, len + 1);
memcpy(s, ptr, len); memcpy(s, ptr, len);
...@@ -216,7 +216,7 @@ char* syncUtilprintBin(char* ptr, uint32_t len) { ...@@ -216,7 +216,7 @@ char* syncUtilprintBin(char* ptr, uint32_t len) {
char* syncUtilprintBin2(char* ptr, uint32_t len) { char* syncUtilprintBin2(char* ptr, uint32_t len) {
uint32_t len2 = len * 4 + 1; uint32_t len2 = len * 4 + 1;
char* s = taosMemoryMalloc(len2); char* s = taosMemoryMalloc(len2);
assert(s != NULL); ASSERT(s != NULL);
memset(s, 0, len2); memset(s, 0, len2);
char* p = s; char* p = s;
......
...@@ -24,7 +24,7 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) { ...@@ -24,7 +24,7 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) {
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) { SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) {
SVotesGranted *pVotesGranted = taosMemoryMalloc(sizeof(SVotesGranted)); SVotesGranted *pVotesGranted = taosMemoryMalloc(sizeof(SVotesGranted));
assert(pVotesGranted != NULL); ASSERT(pVotesGranted != NULL);
memset(pVotesGranted, 0, sizeof(SVotesGranted)); memset(pVotesGranted, 0, sizeof(SVotesGranted));
pVotesGranted->replicas = &(pSyncNode->replicasId); pVotesGranted->replicas = &(pSyncNode->replicasId);
...@@ -62,9 +62,9 @@ bool voteGrantedMajority(SVotesGranted *pVotesGranted) { ...@@ -62,9 +62,9 @@ bool voteGrantedMajority(SVotesGranted *pVotesGranted) {
} }
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) { void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
assert(pMsg->voteGranted == true); ASSERT(pMsg->voteGranted == true);
assert(pMsg->term == pVotesGranted->term); ASSERT(pMsg->term == pVotesGranted->term);
assert(syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId)); ASSERT(syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId));
int j = -1; int j = -1;
for (int i = 0; i < pVotesGranted->replicaNum; ++i) { for (int i = 0; i < pVotesGranted->replicaNum; ++i) {
...@@ -73,14 +73,14 @@ void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) { ...@@ -73,14 +73,14 @@ void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
break; break;
} }
} }
assert(j != -1); ASSERT(j != -1);
assert(j >= 0 && j < pVotesGranted->replicaNum); ASSERT(j >= 0 && j < pVotesGranted->replicaNum);
if (pVotesGranted->isGranted[j] != true) { if (pVotesGranted->isGranted[j] != true) {
++(pVotesGranted->votes); ++(pVotesGranted->votes);
pVotesGranted->isGranted[j] = true; pVotesGranted->isGranted[j] = true;
} }
assert(pVotesGranted->votes <= pVotesGranted->replicaNum); ASSERT(pVotesGranted->votes <= pVotesGranted->replicaNum);
} }
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) { void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) {
...@@ -127,7 +127,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) { ...@@ -127,7 +127,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
char *voteGranted2Str(SVotesGranted *pVotesGranted) { char *voteGranted2Str(SVotesGranted *pVotesGranted) {
cJSON *pJson = voteGranted2Json(pVotesGranted); cJSON *pJson = voteGranted2Json(pVotesGranted);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
...@@ -162,7 +162,7 @@ void voteGrantedLog2(char *s, SVotesGranted *pObj) { ...@@ -162,7 +162,7 @@ void voteGrantedLog2(char *s, SVotesGranted *pObj) {
// SVotesRespond ----------------------------- // SVotesRespond -----------------------------
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) { SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) {
SVotesRespond *pVotesRespond = taosMemoryMalloc(sizeof(SVotesRespond)); SVotesRespond *pVotesRespond = taosMemoryMalloc(sizeof(SVotesRespond));
assert(pVotesRespond != NULL); ASSERT(pVotesRespond != NULL);
memset(pVotesRespond, 0, sizeof(SVotesRespond)); memset(pVotesRespond, 0, sizeof(SVotesRespond));
pVotesRespond->replicas = &(pSyncNode->replicasId); pVotesRespond->replicas = &(pSyncNode->replicasId);
...@@ -198,15 +198,15 @@ bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) { ...@@ -198,15 +198,15 @@ bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
} }
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) { void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) {
assert(pVotesRespond->term == pMsg->term); ASSERT(pVotesRespond->term == pMsg->term);
for (int i = 0; i < pVotesRespond->replicaNum; ++i) { for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
if (syncUtilSameId(&((*(pVotesRespond->replicas))[i]), &pMsg->srcId)) { if (syncUtilSameId(&((*(pVotesRespond->replicas))[i]), &pMsg->srcId)) {
// assert(pVotesRespond->isRespond[i] == false); // ASSERT(pVotesRespond->isRespond[i] == false);
pVotesRespond->isRespond[i] = true; pVotesRespond->isRespond[i] = true;
return; return;
} }
} }
assert(0); ASSERT(0);
} }
void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term) { void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term) {
...@@ -256,7 +256,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) { ...@@ -256,7 +256,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
char *votesRespond2Str(SVotesRespond *pVotesRespond) { char *votesRespond2Str(SVotesRespond *pVotesRespond) {
cJSON *pJson = votesRespond2Json(pVotesRespond); cJSON *pJson = votesRespond2Json(pVotesRespond);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
......
...@@ -34,6 +34,7 @@ add_executable(syncWriteTest "") ...@@ -34,6 +34,7 @@ add_executable(syncWriteTest "")
add_executable(syncReplicateTest "") add_executable(syncReplicateTest "")
add_executable(syncRefTest "") add_executable(syncRefTest "")
add_executable(syncLogStoreCheck "") add_executable(syncLogStoreCheck "")
add_executable(syncLogStoreCheck2 "")
add_executable(syncRaftCfgTest "") add_executable(syncRaftCfgTest "")
add_executable(syncRespMgrTest "") add_executable(syncRespMgrTest "")
add_executable(syncSnapshotTest "") add_executable(syncSnapshotTest "")
...@@ -196,6 +197,10 @@ target_sources(syncLogStoreCheck ...@@ -196,6 +197,10 @@ target_sources(syncLogStoreCheck
PRIVATE PRIVATE
"syncLogStoreCheck.cpp" "syncLogStoreCheck.cpp"
) )
target_sources(syncLogStoreCheck2
PRIVATE
"syncLogStoreCheck2.cpp"
)
target_sources(syncRaftCfgTest target_sources(syncRaftCfgTest
PRIVATE PRIVATE
"syncRaftCfgTest.cpp" "syncRaftCfgTest.cpp"
...@@ -442,6 +447,11 @@ target_include_directories(syncLogStoreCheck ...@@ -442,6 +447,11 @@ target_include_directories(syncLogStoreCheck
"${TD_SOURCE_DIR}/include/libs/sync" "${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc" "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
) )
target_include_directories(syncLogStoreCheck2
PUBLIC
"${TD_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncRaftCfgTest target_include_directories(syncRaftCfgTest
PUBLIC PUBLIC
"${TD_SOURCE_DIR}/include/libs/sync" "${TD_SOURCE_DIR}/include/libs/sync"
...@@ -668,6 +678,10 @@ target_link_libraries(syncLogStoreCheck ...@@ -668,6 +678,10 @@ target_link_libraries(syncLogStoreCheck
sync sync
gtest_main gtest_main
) )
target_link_libraries(syncLogStoreCheck2
sync
gtest_main
)
target_link_libraries(syncRaftCfgTest target_link_libraries(syncRaftCfgTest
sync sync
gtest_main gtest_main
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "wal.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
void init() {
int code = walInit();
assert(code == 0);
}
void cleanup() { walCleanUp(); }
SWal* createWal(char* path, int32_t vgId) {
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
SWal* pWal = walOpen(path, &walCfg);
assert(pWal != NULL);
return pWal;
}
SSyncNode* createSyncNode(SWal* pWal) {
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
memset(pSyncNode, 0, sizeof(SSyncNode));
pSyncNode->pWal = pWal;
return pSyncNode;
}
void usage(char* exe) { printf("usage: %s path vgId \n", exe); }
int main(int argc, char** argv) {
if (argc != 3) {
usage(argv[0]);
exit(-1);
}
char* path = argv[1];
int32_t vgId = atoi(argv[2]);
init();
SWal* pWal = createWal(path, vgId);
assert(pWal != NULL);
SSyncNode* pSyncNode = createSyncNode(pWal);
assert(pSyncNode != NULL);
SSyncLogStore* pLog = logStoreCreate(pSyncNode);
assert(pLog != NULL);
logStorePrint2((char*)"==syncLogStoreCheck==", pLog);
walClose(pWal);
logStoreDestory(pLog);
taosMemoryFree(pSyncNode);
cleanup();
return 0;
}
...@@ -69,6 +69,7 @@ void test1() { ...@@ -69,6 +69,7 @@ void test1() {
init(); init();
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest1 ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest1 ----- ", pLogStore);
if (gAssert) { if (gAssert) {
...@@ -88,6 +89,7 @@ void test1() { ...@@ -88,6 +89,7 @@ void test1() {
init(); init();
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest1 restart ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest1 restart ----- ", pLogStore);
if (gAssert) { if (gAssert) {
...@@ -110,6 +112,7 @@ void test2() { ...@@ -110,6 +112,7 @@ void test2() {
init(); init();
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
pLogStore->syncLogSetBeginIndex(pLogStore, 5); pLogStore->syncLogSetBeginIndex(pLogStore, 5);
logStoreLog2((char*)"\n\n\ntest2 ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest2 ----- ", pLogStore);
...@@ -130,6 +133,7 @@ void test2() { ...@@ -130,6 +133,7 @@ void test2() {
init(); init();
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest2 restart ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest2 restart ----- ", pLogStore);
if (gAssert) { if (gAssert) {
...@@ -152,6 +156,7 @@ void test3() { ...@@ -152,6 +156,7 @@ void test3() {
init(); init();
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest3 ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest3 ----- ", pLogStore);
if (gAssert) { if (gAssert) {
...@@ -198,6 +203,7 @@ void test3() { ...@@ -198,6 +203,7 @@ void test3() {
init(); init();
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest3 restart ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest3 restart ----- ", pLogStore);
if (gAssert) { if (gAssert) {
...@@ -220,6 +226,7 @@ void test4() { ...@@ -220,6 +226,7 @@ void test4() {
init(); init();
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest4 ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest4 ----- ", pLogStore);
pLogStore->syncLogSetBeginIndex(pLogStore, 5); pLogStore->syncLogSetBeginIndex(pLogStore, 5);
...@@ -257,6 +264,7 @@ void test4() { ...@@ -257,6 +264,7 @@ void test4() {
init(); init();
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest4 restart ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest4 restart ----- ", pLogStore);
if (gAssert) { if (gAssert) {
...@@ -279,6 +287,7 @@ void test5() { ...@@ -279,6 +287,7 @@ void test5() {
init(); init();
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore);
pLogStore->syncLogSetBeginIndex(pLogStore, 5); pLogStore->syncLogSetBeginIndex(pLogStore, 5);
...@@ -329,6 +338,7 @@ void test5() { ...@@ -329,6 +338,7 @@ void test5() {
init(); init();
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest5 restart ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest5 restart ----- ", pLogStore);
if (gAssert) { if (gAssert) {
...@@ -351,6 +361,7 @@ void test6() { ...@@ -351,6 +361,7 @@ void test6() {
init(); init();
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest6 ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest6 ----- ", pLogStore);
pLogStore->syncLogSetBeginIndex(pLogStore, 5); pLogStore->syncLogSetBeginIndex(pLogStore, 5);
...@@ -401,6 +412,7 @@ void test6() { ...@@ -401,6 +412,7 @@ void test6() {
init(); init();
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest6 restart ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest6 restart ----- ", pLogStore);
if (gAssert) { if (gAssert) {
......
...@@ -259,6 +259,12 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **p ...@@ -259,6 +259,12 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **p
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
int64_t code; int64_t code;
if (pRead->pWal->vers.firstVer == -1) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1;
}
// TODO: check wal life // TODO: check wal life
if (pRead->curVersion != ver) { if (pRead->curVersion != ver) {
if (walReadSeekVer(pRead, ver) < 0) { if (walReadSeekVer(pRead, ver) < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册