未验证 提交 79a8ebb3 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18016 from taosdata/fix/TD-20052

refact: remove assert and adjust log
......@@ -20,12 +20,8 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "taosdef.h"
// TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) ==
......
......@@ -20,12 +20,8 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "taosdef.h"
// TLA+ Spec
// HandleAppendEntriesResponse(i, j, m) ==
......
......@@ -20,11 +20,7 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "taosdef.h"
// \* Leader i advances its commitIndex.
// \* This is done as a separate step from handling AppendEntries responses,
......
......@@ -20,11 +20,7 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "taosdef.h"
// TLA+ Spec
// RequestVote(i, j) ==
......
......@@ -20,11 +20,7 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "taosdef.h"
// SIndexMgr -----------------------------
typedef struct SSyncIndexMgr {
......
......@@ -20,12 +20,7 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "cJSON.h"
#include "syncInt.h"
#include "taosdef.h"
// ---------------------------------------------
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg);
......
......@@ -20,12 +20,7 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "cJSON.h"
#include "syncInt.h"
#include "taosdef.h"
#define CONFIG_FILE_LEN 2048
......
......@@ -20,13 +20,8 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "taosdef.h"
#include "tref.h"
#include "tskiplist.h"
typedef struct SSyncRaftEntry {
......
......@@ -20,12 +20,8 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncRaftEntry.h"
#include "taosdef.h"
#include "wal.h"
typedef struct SSyncLogStoreData {
......
......@@ -20,12 +20,7 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "cJSON.h"
#include "syncInt.h"
#include "taosdef.h"
#define RAFT_STORE_BLOCK_SIZE 512
#define RAFT_STORE_PATH_LEN (TSDB_FILENAME_LEN * 2)
......
......@@ -20,11 +20,7 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "taosdef.h"
// TLA+ Spec
// AppendEntries(i, j) ==
......
......@@ -20,12 +20,8 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "taosdef.h"
// TLA+ Spec
// HandleRequestVoteRequest(i, j, m) ==
......
......@@ -20,12 +20,8 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "taosdef.h"
// TLA+ Spec
// HandleRequestVoteResponse(i, j, m) ==
......
......@@ -20,11 +20,7 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "taosdef.h"
typedef struct SRespStub {
SRpcMsg rpcMsg;
......
......@@ -20,13 +20,7 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "cJSON.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "taosdef.h"
#define SYNC_SNAPSHOT_SEQ_INVALID -2
#define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -3
......@@ -36,7 +30,6 @@ extern "C" {
#define SYNC_SNAPSHOT_RETRY_MS 5000
//---------------------------------------------------
typedef struct SSyncSnapshotSender {
bool start;
int32_t seq;
......@@ -65,12 +58,6 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finis
int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char *snapshotSender2Str(SSyncSnapshotSender *pSender);
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId);
//---------------------------------------------------
typedef struct SSyncSnapshotReceiver {
bool start;
int32_t ack;
......@@ -83,7 +70,6 @@ typedef struct SSyncSnapshotReceiver {
// init when create
SSyncNode *pSyncNode;
} SSyncSnapshotReceiver;
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId);
......@@ -93,13 +79,10 @@ int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
//---------------------------------------------------
// on message
int32_t syncNodeOnSnapshot(SSyncNode *ths, SyncSnapshotSend *pMsg);
int32_t syncNodeOnSnapshotReply(SSyncNode *ths, SyncSnapshotRsp *pMsg);
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId);
#ifdef __cplusplus
}
......
......@@ -20,12 +20,7 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "taosdef.h"
// TLA+ Spec
// Timeout(i) == /\ state[i] \in {Follower, Candidate}
......
......@@ -728,7 +728,6 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg);
int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg);
int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg);
// -----------------------------------------
......
......@@ -24,17 +24,15 @@ extern "C" {
uint64_t syncUtilAddr2U64(const char* host, uint16_t port);
void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port);
void syncUtilnodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet);
void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet);
bool syncUtilnodeInfo2raftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId);
void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet);
void syncUtilRaftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet);
bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId);
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2);
bool syncUtilEmptyId(const SRaftId* pId);
int32_t syncUtilElectRandomMS(int32_t min, int32_t max);
int32_t syncUtilQuorum(int32_t replicaNum);
cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p);
cJSON* syncUtilRaftId2Json(const SRaftId* p);
char* syncUtilRaftId2Str(const SRaftId* p);
const char* syncStr(ESyncState state);
char* syncUtilPrintBin(char* ptr, uint32_t len);
char* syncUtilPrintBin2(char* ptr, uint32_t len);
......
......@@ -39,8 +39,6 @@ void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncN
bool voteGrantedMajority(SVotesGranted *pVotesGranted);
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg);
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term);
cJSON *voteGranted2Json(SVotesGranted *pVotesGranted);
char *voteGranted2Str(SVotesGranted *pVotesGranted);
typedef struct SVotesRespond {
SRaftId (*replicas)[TSDB_MAX_REPLICA];
......@@ -56,8 +54,6 @@ void votesRespondUpdate(SVotesRespond *pVotesRespond, SSyncNode *pSync
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId);
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg);
void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term);
cJSON *votesRespond2Json(SVotesRespond *pVotesRespond);
char *votesRespond2Str(SVotesRespond *pVotesRespond);
#ifdef __cplusplus
}
......
......@@ -757,7 +757,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// init internal
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
if (!syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) {
sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId);
goto _error;
}
......@@ -772,7 +772,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
}
}
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
if (!syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) {
sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i);
goto _error;
}
......@@ -781,7 +781,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// init replicaNum, replicasId
pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
if (!syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
if (!syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) {
sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i);
goto _error;
}
......@@ -1213,7 +1213,7 @@ int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
// utils --------------
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet);
syncUtilRaftId2EpSet(destRaftId, &epSet);
if (pSyncNode->syncSendMSg != NULL) {
// htonl
syncUtilMsgHtoN(pMsg->pCont);
......@@ -1230,7 +1230,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet;
syncUtilnodeInfo2EpSet(nodeInfo, &epSet);
syncUtilNodeInfo2EpSet(nodeInfo, &epSet);
if (pSyncNode->syncSendMSg != NULL) {
// htonl
syncUtilMsgHtoN(pMsg->pCont);
......@@ -1344,7 +1344,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// init internal
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
// init peersNum, peers, peersId
pSyncNode->peersNum = pSyncNode->pRaftCfg->cfg.replicaNum - 1;
......@@ -1356,13 +1356,13 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
}
}
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
syncUtilnodeInfo2raftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i]);
}
// init replicaNum, replicasId
pSyncNode->replicaNum = pSyncNode->pRaftCfg->cfg.replicaNum;
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
syncUtilnodeInfo2raftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
syncUtilNodeInfo2RaftId(&pSyncNode->pRaftCfg->cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i]);
}
// update quorum first
......
......@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "syncRaftEntry.h"
#include "syncUtil.h"
#include "tref.h"
SSyncRaftEntry* syncEntryBuild(int32_t dataLen) {
int32_t bytes = sizeof(SSyncRaftEntry) + dataLen;
......
......@@ -13,33 +13,25 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "syncSnapshot.h"
#include "syncIndexMgr.h"
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "wal.h"
//----------------------------------
static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg);
static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg);
static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg);
//----------------------------------
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) {
bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) &&
(pSyncNode->pFsm->FpSnapshotDoRead != NULL);
SSyncSnapshotSender *pSender = NULL;
if (condition) {
pSender = taosMemoryMalloc(sizeof(SSyncSnapshotSender));
pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender));
if (pSender == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memset(pSender, 0, sizeof(*pSender));
pSender->start = false;
pSender->seq = SYNC_SNAPSHOT_SEQ_INVALID;
......@@ -249,64 +241,6 @@ static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnaps
++(pSender->seq);
}
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char u64buf[128];
cJSON *pRoot = cJSON_CreateObject();
if (pSender != NULL) {
cJSON_AddNumberToObject(pRoot, "start", pSender->start);
cJSON_AddNumberToObject(pRoot, "seq", pSender->seq);
cJSON_AddNumberToObject(pRoot, "ack", pSender->ack);
snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader);
cJSON_AddStringToObject(pRoot, "pReader", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock);
cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen);
if (pSender->pCurrentBlock != NULL) {
char *s;
s = syncUtilPrintBin((char *)(pSender->pCurrentBlock), pSender->blockLen);
cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
taosMemoryFree(s);
s = syncUtilPrintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen);
cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
taosMemoryFree(s);
}
cJSON *pSnapshot = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyIndex);
cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyTerm);
cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf);
cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->sendingMS);
cJSON_AddStringToObject(pRoot, "sendingMS", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
// snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->privateTerm);
// cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
}
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot);
return pJson;
}
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
sNTrace(pSyncNode, "starting snapshot ...");
......@@ -335,16 +269,17 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
return 0;
}
// -------------------------------------
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
(pSyncNode->pFsm->FpSnapshotDoWrite != NULL);
SSyncSnapshotReceiver *pReceiver = NULL;
if (condition) {
pReceiver = taosMemoryMalloc(sizeof(SSyncSnapshotReceiver));
ASSERT(pReceiver != NULL);
memset(pReceiver, 0, sizeof(*pReceiver));
pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver));
if (pReceiver == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pReceiver->start = false;
pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN;
......@@ -530,63 +465,6 @@ static void snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapsh
}
}
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char u64buf[128];
cJSON *pRoot = cJSON_CreateObject();
if (pReceiver != NULL) {
cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
cJSON_AddStringToObject(pRoot, "pWriter", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
cJSON *pFromId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->fromId.addr);
cJSON_AddStringToObject(pFromId, "addr", u64buf);
{
uint64_t u64 = pReceiver->fromId.addr;
cJSON *pTmp = pFromId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
cJSON_AddItemToObject(pRoot, "fromId", pFromId);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyIndex);
cJSON_AddStringToObject(pRoot, "snapshot.lastApplyIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyTerm);
cJSON_AddStringToObject(pRoot, "snapshot.lastApplyTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastConfigIndex);
cJSON_AddStringToObject(pRoot, "snapshot.lastConfigIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pReceiver->startTime);
cJSON_AddStringToObject(pRoot, "startTime", u64buf);
}
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
return pJson;
}
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
SyncIndex snapStart = SYNC_INDEX_INVALID;
......
......@@ -13,28 +13,23 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "syncTimeout.h"
#include "syncElection.h"
#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncReplication.h"
#include "syncRespMgr.h"
static void syncNodeCleanConfigIndex(SSyncNode* ths) {
int32_t newArrIndex = 0;
SyncIndex newConfigIndexArr[MAX_CONFIG_INDEX_COUNT];
memset(newConfigIndexArr, 0, sizeof(newConfigIndexArr));
SyncIndex newConfigIndexArr[MAX_CONFIG_INDEX_COUNT] = {0};
SSnapshot snapshot = {0};
if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) {
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
}
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex != SYNC_INDEX_INVALID) {
for (int i = 0; i < ths->pRaftCfg->configIndexCount; ++i) {
for (int32_t i = 0; i < ths->pRaftCfg->configIndexCount; ++i) {
if (ths->pRaftCfg->configIndexArr[i] < snapshot.lastConfigIndex) {
// pass
;
} else {
// save
newConfigIndexArr[newArrIndex] = ths->pRaftCfg->configIndexArr[i];
......@@ -47,13 +42,15 @@ static void syncNodeCleanConfigIndex(SSyncNode* ths) {
memcpy(ths->pRaftCfg->configIndexArr, newConfigIndexArr, sizeof(newConfigIndexArr));
int32_t code = raftCfgPersist(ths->pRaftCfg);
ASSERT(code == 0);
sNTrace(ths, "clean config index arr, old-cnt:%d, new-cnt:%d", oldCnt, ths->pRaftCfg->configIndexCount);
if (code != 0) {
sNFatal(ths, "failed to persist cfg");
} else {
sNTrace(ths, "clean config index arr, old-cnt:%d, new-cnt:%d", oldCnt, ths->pRaftCfg->configIndexCount);
}
}
}
int32_t syncNodeTimerRoutine(SSyncNode* ths) {
static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
sNTrace(ths, "timer routines");
// timer replicate
......@@ -71,7 +68,7 @@ int32_t syncNodeTimerRoutine(SSyncNode* ths) {
SSyncLogStoreData* pData = ths->pLogStore->data;
int32_t code = walEndSnapshot(pData->pWal);
if (code != 0) {
sError("vgId:%d, timer wal snapshot end error since:%s", ths->vgId, terrstr());
sNError(ths, "timer wal snapshot end error since:%s", terrstr());
return -1;
} else {
sNTrace(ths, "wal snapshot end, index:%" PRId64, atomic_load_64(&ths->snapshottingIndex));
......
......@@ -41,15 +41,15 @@ void syncUtilU642Addr(uint64_t u64, char* host, int64_t len, uint16_t* port) {
*port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16);
}
void syncUtilnodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) {
void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) {
pEpSet->inUse = 0;
pEpSet->numOfEps = 0;
addEpIntoEpSet(pEpSet, pInfo->nodeFqdn, pInfo->nodePort);
}
void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {
void syncUtilRaftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {
char host[TSDB_FQDN_LEN] = {0};
uint16_t port;
uint16_t port = 0;
syncUtilU642Addr(raftId->addr, host, sizeof(host), &port);
pEpSet->inUse = 0;
......@@ -57,7 +57,7 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) {
addEpIntoEpSet(pEpSet, host, port);
}
bool syncUtilnodeInfo2raftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) {
bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) {
uint32_t ipv4 = taosGetIpv4FromFqdn(pInfo->nodeFqdn);
if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
sError("failed to resolve ipv4 addr, fqdn: %s", pInfo->nodeFqdn);
......@@ -73,8 +73,7 @@ bool syncUtilnodeInfo2raftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId*
}
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
bool ret = pId1->addr == pId2->addr && pId1->vgId == pId2->vgId;
return ret;
return pId1->addr == pId2->addr && pId1->vgId == pId2->vgId;
}
bool syncUtilEmptyId(const SRaftId* pId) { return (pId->addr == 0 && pId->vgId == 0); }
......@@ -90,18 +89,6 @@ int32_t syncUtilElectRandomMS(int32_t min, int32_t max) {
int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; }
cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
cJSON_AddStringToObject(pRoot, "nodeFqdn", p->nodeFqdn);
cJSON_AddNumberToObject(pRoot, "nodePort", p->nodePort);
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SNodeInfo", pRoot);
return pJson;
}
cJSON* syncUtilRaftId2Json(const SRaftId* p) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
......@@ -120,13 +107,6 @@ cJSON* syncUtilRaftId2Json(const SRaftId* p) {
return pJson;
}
char* syncUtilRaftId2Str(const SRaftId* p) {
cJSON* pJson = syncUtilRaftId2Json(p);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
static inline bool syncUtilCanPrint(char c) {
if (c >= 32 && c <= 126) {
return true;
......@@ -165,14 +145,12 @@ char* syncUtilPrintBin2(char* ptr, uint32_t len) {
}
void syncUtilMsgHtoN(void* msg) {
// htonl
SMsgHead* pHead = msg;
pHead->contLen = htonl(pHead->contLen);
pHead->vgId = htonl(pHead->vgId);
}
void syncUtilMsgNtoH(void* msg) {
// ntohl
SMsgHead* pHead = msg;
pHead->contLen = ntohl(pHead->contLen);
pHead->vgId = ntohl(pHead->vgId);
......
......@@ -23,12 +23,11 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) {
}
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) {
SVotesGranted *pVotesGranted = taosMemoryMalloc(sizeof(SVotesGranted));
SVotesGranted *pVotesGranted = taosMemoryCalloc(1, sizeof(SVotesGranted));
if (pVotesGranted == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memset(pVotesGranted, 0, sizeof(SVotesGranted));
pVotesGranted->replicas = &(pSyncNode->replicasId);
pVotesGranted->replicaNum = pSyncNode->replicaNum;
......@@ -59,20 +58,24 @@ void voteGrantedUpdate(SVotesGranted *pVotesGranted, SSyncNode *pSyncNode) {
pVotesGranted->pSyncNode = pSyncNode;
}
bool voteGrantedMajority(SVotesGranted *pVotesGranted) {
bool ret = pVotesGranted->votes >= pVotesGranted->quorum;
return ret;
}
bool voteGrantedMajority(SVotesGranted *pVotesGranted) { return pVotesGranted->votes >= pVotesGranted->quorum; }
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
ASSERT(pMsg->voteGranted == true);
if (!pMsg->voteGranted) {
sNFatal(pVotesGranted->pSyncNode, "vote granted should be true");
return;
}
if (pMsg->term != pVotesGranted->term) {
sNTrace(pVotesGranted->pSyncNode, "vote grant vnode error");
sNTrace(pVotesGranted->pSyncNode, "vote grant term:%" PRId64 " not matched with msg term:%" PRId64,
pVotesGranted->term, pMsg->term);
return;
}
ASSERT(syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId));
if (!syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId)) {
sNFatal(pVotesGranted->pSyncNode, "vote granted raftId not matched with msg");
return;
}
int32_t j = -1;
for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) {
......@@ -81,14 +84,21 @@ void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
break;
}
}
ASSERT(j != -1);
ASSERT(j >= 0 && j < pVotesGranted->replicaNum);
if ((j == -1) || !(j >= 0 && j < pVotesGranted->replicaNum)) {
sNFatal(pVotesGranted->pSyncNode, "invalid msg srcId, index:%d", j);
return;
}
if (pVotesGranted->isGranted[j] != true) {
++(pVotesGranted->votes);
pVotesGranted->isGranted[j] = true;
}
ASSERT(pVotesGranted->votes <= pVotesGranted->replicaNum);
if (pVotesGranted->votes > pVotesGranted->replicaNum) {
sNFatal(pVotesGranted->pSyncNode, "votes:%d not matched with replicaNum:%d", pVotesGranted->votes,
pVotesGranted->replicaNum);
return;
}
}
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) {
......@@ -97,53 +107,12 @@ void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) {
pVotesGranted->toLeader = false;
}
cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
char u64buf[128] = {0};
cJSON *pRoot = cJSON_CreateObject();
if (pVotesGranted != NULL) {
cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesGranted->replicaNum);
cJSON *pReplicas = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) {
cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesGranted->replicas))[i]));
}
int32_t *arr = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * pVotesGranted->replicaNum);
for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) {
arr[i] = pVotesGranted->isGranted[i];
}
cJSON *pIsGranted = cJSON_CreateIntArray(arr, pVotesGranted->replicaNum);
taosMemoryFree(arr);
cJSON_AddItemToObject(pRoot, "isGranted", pIsGranted);
cJSON_AddNumberToObject(pRoot, "votes", pVotesGranted->votes);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pVotesGranted->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
cJSON_AddNumberToObject(pRoot, "quorum", pVotesGranted->quorum);
cJSON_AddNumberToObject(pRoot, "toLeader", pVotesGranted->toLeader);
snprintf(u64buf, sizeof(u64buf), "%p", pVotesGranted->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
bool majority = voteGrantedMajority(pVotesGranted);
cJSON_AddNumberToObject(pRoot, "majority", majority);
}
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SVotesGranted", pRoot);
return pJson;
}
char *voteGranted2Str(SVotesGranted *pVotesGranted) {
cJSON *pJson = voteGranted2Json(pVotesGranted);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) {
SVotesRespond *pVotesRespond = taosMemoryMalloc(sizeof(SVotesRespond));
ASSERT(pVotesRespond != NULL);
memset(pVotesRespond, 0, sizeof(SVotesRespond));
SVotesRespond *pVotesRespond = taosMemoryCalloc(1, sizeof(SVotesRespond));
if (pVotesRespond == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pVotesRespond->replicas = &(pSyncNode->replicasId);
pVotesRespond->replicaNum = pSyncNode->replicaNum;
......@@ -185,62 +154,15 @@ void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *p
for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) {
if (syncUtilSameId(&((*(pVotesRespond->replicas))[i]), &pMsg->srcId)) {
// ASSERT(pVotesRespond->isRespond[i] == false);
pVotesRespond->isRespond[i] = true;
return;
}
}
ASSERT(0);
sNFatal(pVotesRespond->pSyncNode, "votes respond not found");
}
void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term) {
pVotesRespond->term = term;
memset(pVotesRespond->isRespond, 0, sizeof(pVotesRespond->isRespond));
/*
for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) {
pVotesRespond->isRespond[i] = false;
}
*/
}
cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
char u64buf[128] = {0};
cJSON *pRoot = cJSON_CreateObject();
if (pVotesRespond != NULL) {
cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesRespond->replicaNum);
cJSON *pReplicas = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) {
cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesRespond->replicas))[i]));
}
int32_t respondNum = 0;
int32_t *arr = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * pVotesRespond->replicaNum);
for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) {
arr[i] = pVotesRespond->isRespond[i];
if (pVotesRespond->isRespond[i]) {
respondNum++;
}
}
cJSON *pIsRespond = cJSON_CreateIntArray(arr, pVotesRespond->replicaNum);
taosMemoryFree(arr);
cJSON_AddItemToObject(pRoot, "isRespond", pIsRespond);
cJSON_AddNumberToObject(pRoot, "respondNum", respondNum);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pVotesRespond->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pVotesRespond->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
}
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SVotesRespond", pRoot);
return pJson;
}
char *votesRespond2Str(SVotesRespond *pVotesRespond) {
cJSON *pJson = votesRespond2Json(pVotesRespond);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
//#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncUtil.h"
#include "trpc.h"
#include "syncTest.h"
void logTest() {
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "os.h"
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncUtil.h"
#include "wal.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "os.h"
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncUtil.h"
#include "wal.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncUtil.h"
#include "wal.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncTest.h"
#include "syncUtil.h"
#include "wal.h"
#if 0
void logTest() {
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "tref.h"
#include "tskiplist.h"
#include "syncTest.h"
void logTest() {
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
......
#include "syncEnv.h"
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "ttime.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncTest.h"
#include "syncUtil.h"
#include "tskiplist.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "tdatablock.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......@@ -101,7 +95,7 @@ int main(int argc, char** argv) {
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
SEpSet epSet;
syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet);
syncUtilNodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet);
rpcMsg.info.noResp = 1;
pSyncNode->syncSendMSg(&epSet, &rpcMsg);
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include "syncIndexMgr.h"
//#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncTest.h"
void print(SHashObj *pNextIndex) {
printf("----------------\n");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncTest.h"
#include "syncUtil.h"
#include "wal.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "wal.h"
#include "syncTest.h"
void logTest() {
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncTest.h"
#include "syncUtil.h"
#include "wal.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include "syncRaftStore.h"
//#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftCfg.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include "syncRaftStore.h"
//#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftCfg.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncUtil.h"
#include "syncTest.h"
void usage(char* exe) {
printf("Usage: %s host port \n", exe);
......
#include "syncRaftLog.h"
//#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "wal.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncTest.h"
#include "syncUtil.h"
#include "wal.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include "syncRaftStore.h"
//#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "tref.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncUtil.h"
#include "wal.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "wal.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftStore.h"
#include "syncSnapshot.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftStore.h"
#include "syncSnapshot.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "wal.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncTest.h"
/*
typedef enum {
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "os.h"
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftCfg.h"
#include "syncUtil.h"
#include "wal.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include "syncUtil.h"
//#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
#include "syncTest.h"
void logTest() {
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
#include "syncTest.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncTest.h"
#include "syncUtil.h"
#include "wal.h"
void logTest() {
sTrace("--- sync log test: trace");
......
......@@ -22,8 +22,10 @@ extern "C" {
#include "syncInt.h"
#include "tref.h"
#include "wal.h"
#include "tref.h"
#include "syncEnv.h"
#include "syncIO.h"
#include "syncIndexMgr.h"
......@@ -38,6 +40,8 @@ extern "C" {
#include "syncUtil.h"
#include "syncVoteMgr.h"
extern void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry);
char* syncEntry2Str(const SSyncRaftEntry* pEntry);
void syncEntryPrint(const SSyncRaftEntry* pObj);
......@@ -82,6 +86,18 @@ void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore);
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode);
cJSON* voteGranted2Json(SVotesGranted* pVotesGranted);
char* voteGranted2Str(SVotesGranted* pVotesGranted);
cJSON* votesRespond2Json(SVotesRespond* pVotesRespond);
char* votesRespond2Str(SVotesRespond* pVotesRespond);
cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p);
char* syncUtilRaftId2Str(const SRaftId* p);
cJSON* snapshotSender2Json(SSyncSnapshotSender* pSender);
char* snapshotSender2Str(SSyncSnapshotSender* pSender);
cJSON* snapshotReceiver2Json(SSyncSnapshotReceiver* pReceiver);
char* snapshotReceiver2Str(SSyncSnapshotReceiver* pReceiver);
#ifdef __cplusplus
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "syncTest.h"
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char u64buf[128];
cJSON *pRoot = cJSON_CreateObject();
if (pSender != NULL) {
cJSON_AddNumberToObject(pRoot, "start", pSender->start);
cJSON_AddNumberToObject(pRoot, "seq", pSender->seq);
cJSON_AddNumberToObject(pRoot, "ack", pSender->ack);
snprintf(u64buf, sizeof(u64buf), "%p", pSender->pReader);
cJSON_AddStringToObject(pRoot, "pReader", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSender->pCurrentBlock);
cJSON_AddStringToObject(pRoot, "pCurrentBlock", u64buf);
cJSON_AddNumberToObject(pRoot, "blockLen", pSender->blockLen);
if (pSender->pCurrentBlock != NULL) {
char *s;
s = syncUtilPrintBin((char *)(pSender->pCurrentBlock), pSender->blockLen);
cJSON_AddStringToObject(pRoot, "pCurrentBlock", s);
taosMemoryFree(s);
s = syncUtilPrintBin2((char *)(pSender->pCurrentBlock), pSender->blockLen);
cJSON_AddStringToObject(pRoot, "pCurrentBlock2", s);
taosMemoryFree(s);
}
cJSON *pSnapshot = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyIndex);
cJSON_AddStringToObject(pSnapshot, "lastApplyIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->snapshot.lastApplyTerm);
cJSON_AddStringToObject(pSnapshot, "lastApplyTerm", u64buf);
cJSON_AddItemToObject(pRoot, "snapshot", pSnapshot);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->sendingMS);
cJSON_AddStringToObject(pRoot, "sendingMS", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSender->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
cJSON_AddNumberToObject(pRoot, "replicaIndex", pSender->replicaIndex);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
// snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->privateTerm);
// cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
cJSON_AddNumberToObject(pRoot, "finish", pSender->finish);
}
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SSyncSnapshotSender", pRoot);
return pJson;
}
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char u64buf[128];
cJSON *pRoot = cJSON_CreateObject();
if (pReceiver != NULL) {
cJSON_AddNumberToObject(pRoot, "start", pReceiver->start);
cJSON_AddNumberToObject(pRoot, "ack", pReceiver->ack);
snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pWriter);
cJSON_AddStringToObject(pRoot, "pWriter", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pReceiver->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
cJSON *pFromId = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->fromId.addr);
cJSON_AddStringToObject(pFromId, "addr", u64buf);
{
uint64_t u64 = pReceiver->fromId.addr;
cJSON *pTmp = pFromId;
char host[128] = {0};
uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port);
cJSON_AddStringToObject(pTmp, "addr_host", host);
cJSON_AddNumberToObject(pTmp, "addr_port", port);
}
cJSON_AddNumberToObject(pFromId, "vgId", pReceiver->fromId.vgId);
cJSON_AddItemToObject(pRoot, "fromId", pFromId);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyIndex);
cJSON_AddStringToObject(pRoot, "snapshot.lastApplyIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastApplyTerm);
cJSON_AddStringToObject(pRoot, "snapshot.lastApplyTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->snapshot.lastConfigIndex);
cJSON_AddStringToObject(pRoot, "snapshot.lastConfigIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pReceiver->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%" PRId64, pReceiver->startTime);
cJSON_AddStringToObject(pRoot, "startTime", u64buf);
}
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SSyncSnapshotReceiver", pRoot);
return pJson;
}
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "syncTest.h"
cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p) {
char u64buf[128] = {0};
cJSON* pRoot = cJSON_CreateObject();
cJSON_AddStringToObject(pRoot, "nodeFqdn", p->nodeFqdn);
cJSON_AddNumberToObject(pRoot, "nodePort", p->nodePort);
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SNodeInfo", pRoot);
return pJson;
}
char* syncUtilRaftId2Str(const SRaftId* p) {
cJSON* pJson = syncUtilRaftId2Json(p);
char* serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "syncTest.h"
cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
char u64buf[128] = {0};
cJSON *pRoot = cJSON_CreateObject();
if (pVotesGranted != NULL) {
cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesGranted->replicaNum);
cJSON *pReplicas = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) {
cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesGranted->replicas))[i]));
}
int32_t *arr = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * pVotesGranted->replicaNum);
for (int32_t i = 0; i < pVotesGranted->replicaNum; ++i) {
arr[i] = pVotesGranted->isGranted[i];
}
cJSON *pIsGranted = cJSON_CreateIntArray(arr, pVotesGranted->replicaNum);
taosMemoryFree(arr);
cJSON_AddItemToObject(pRoot, "isGranted", pIsGranted);
cJSON_AddNumberToObject(pRoot, "votes", pVotesGranted->votes);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pVotesGranted->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
cJSON_AddNumberToObject(pRoot, "quorum", pVotesGranted->quorum);
cJSON_AddNumberToObject(pRoot, "toLeader", pVotesGranted->toLeader);
snprintf(u64buf, sizeof(u64buf), "%p", pVotesGranted->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
bool majority = voteGrantedMajority(pVotesGranted);
cJSON_AddNumberToObject(pRoot, "majority", majority);
}
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SVotesGranted", pRoot);
return pJson;
}
char *voteGranted2Str(SVotesGranted *pVotesGranted) {
cJSON *pJson = voteGranted2Json(pVotesGranted);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
char u64buf[128] = {0};
cJSON *pRoot = cJSON_CreateObject();
if (pVotesRespond != NULL) {
cJSON_AddNumberToObject(pRoot, "replicaNum", pVotesRespond->replicaNum);
cJSON *pReplicas = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) {
cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pVotesRespond->replicas))[i]));
}
int32_t respondNum = 0;
int32_t *arr = (int32_t *)taosMemoryMalloc(sizeof(int32_t) * pVotesRespond->replicaNum);
for (int32_t i = 0; i < pVotesRespond->replicaNum; ++i) {
arr[i] = pVotesRespond->isRespond[i];
if (pVotesRespond->isRespond[i]) {
respondNum++;
}
}
cJSON *pIsRespond = cJSON_CreateIntArray(arr, pVotesRespond->replicaNum);
taosMemoryFree(arr);
cJSON_AddItemToObject(pRoot, "isRespond", pIsRespond);
cJSON_AddNumberToObject(pRoot, "respondNum", respondNum);
snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pVotesRespond->term);
cJSON_AddStringToObject(pRoot, "term", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pVotesRespond->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
}
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SVotesRespond", pRoot);
return pJson;
}
char *votesRespond2Str(SVotesRespond *pVotesRespond) {
cJSON *pJson = votesRespond2Json(pVotesRespond);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册