未验证 提交 959bda64 编写于 作者: L Li Minghao 提交者: GitHub

Merge pull request #10710 from taosdata/feature/3.0_mhli

Feature/3.0 mhli
......@@ -42,6 +42,12 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId
cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr);
char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr);
// for debug -------------------
void syncIndexMgrPrint(SSyncIndexMgr *pObj);
void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj);
void syncIndexMgrLog(SSyncIndexMgr *pObj);
void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj);
#ifdef __cplusplus
}
#endif
......
......@@ -211,11 +211,14 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
// for debug
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode);
void syncNodePrint(char* s, const SSyncNode* pSyncNode);
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode);
// for debug --------------
void syncNodePrint(SSyncNode* pObj);
void syncNodePrint2(char* s, SSyncNode* pObj);
void syncNodeLog(SSyncNode* pObj);
void syncNodeLog2(char* s, SSyncNode* pObj);
#ifdef __cplusplus
}
......
......@@ -47,6 +47,12 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg);
cJSON* syncRpcUnknownMsg2Json();
char* syncRpcMsg2Str(SRpcMsg* pRpcMsg);
// for debug ----------------------
void syncRpcMsgPrint(SRpcMsg* pMsg);
void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg);
void syncRpcMsgLog(SRpcMsg* pMsg);
void syncRpcMsgLog2(char* s, SRpcMsg* pMsg);
// ---------------------------------------------
typedef enum ESyncTimeoutType {
SYNC_TIMEOUT_PING = 100,
......@@ -60,17 +66,27 @@ typedef struct SyncTimeout {
ESyncTimeoutType timeoutType;
uint64_t logicClock;
int32_t timerMS;
void* data;
void* data; // need optimized
} SyncTimeout;
SyncTimeout* syncTimeoutBuild();
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, void* data);
void syncTimeoutDestroy(SyncTimeout* pMsg);
void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen);
void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg);
char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len); //
SyncTimeout* syncTimeoutDeserialize2(const char* buf, uint32_t len); //
void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg);
void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg);
SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg); //
cJSON* syncTimeout2Json(const SyncTimeout* pMsg);
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, void* data);
char* syncTimeout2Str(const SyncTimeout* pMsg); //
// for debug ----------------------
void syncTimeoutPrint(const SyncTimeout* pMsg);
void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg);
void syncTimeoutLog(const SyncTimeout* pMsg);
void syncTimeoutLog2(char* s, const SyncTimeout* pMsg);
// ---------------------------------------------
typedef struct SyncPing {
......@@ -83,17 +99,25 @@ typedef struct SyncPing {
char data[];
} SyncPing;
#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
SyncPing* syncPingBuild(uint32_t dataLen);
SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str);
SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId);
void syncPingDestroy(SyncPing* pMsg);
void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen);
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg);
char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len);
SyncPing* syncPingDeserialize2(const char* buf, uint32_t len);
void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg);
void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg);
SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncPing2Json(const SyncPing* pMsg);
SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str);
SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId);
char* syncPing2Str(const SyncPing* pMsg);
// for debug ----------------------
void syncPingPrint(const SyncPing* pMsg);
void syncPingPrint2(char* s, const SyncPing* pMsg);
void syncPingLog(const SyncPing* pMsg);
void syncPingLog2(char* s, const SyncPing* pMsg);
// ---------------------------------------------
typedef struct SyncPingReply {
......@@ -106,18 +130,25 @@ typedef struct SyncPingReply {
char data[];
} SyncPingReply;
#define SYNC_PING_REPLY_FIX_LEN \
(sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
SyncPingReply* syncPingReplyBuild(uint32_t dataLen);
SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str);
SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId);
void syncPingReplyDestroy(SyncPingReply* pMsg);
void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen);
void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg);
char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len); //
SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len); //
void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg);
void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg);
SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg); //
cJSON* syncPingReply2Json(const SyncPingReply* pMsg);
SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str);
SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId);
char* syncPingReply2Str(const SyncPingReply* pMsg); //
// for debug ----------------------
void syncPingReplyPrint(const SyncPingReply* pMsg);
void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg);
void syncPingReplyLog(const SyncPingReply* pMsg);
void syncPingReplyLog2(char* s, const SyncPingReply* pMsg);
// ---------------------------------------------
typedef struct SyncClientRequest {
......@@ -131,13 +162,23 @@ typedef struct SyncClientRequest {
} SyncClientRequest;
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen);
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak);
void syncClientRequestDestroy(SyncClientRequest* pMsg);
void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen);
void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg);
char* syncClientRequestSerialize2(const SyncClientRequest* pMsg, uint32_t* len);
SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len);
void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg);
void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg);
SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg);
SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak);
char* syncClientRequest2Str(const SyncClientRequest* pMsg);
// for debug ----------------------
void syncClientRequestPrint(const SyncClientRequest* pMsg);
void syncClientRequestPrint2(char* s, const SyncClientRequest* pMsg);
void syncClientRequestLog(const SyncClientRequest* pMsg);
void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg);
// ---------------------------------------------
typedef struct SyncClientRequestReply {
......@@ -163,9 +204,19 @@ SyncRequestVote* syncRequestVoteBuild();
void syncRequestVoteDestroy(SyncRequestVote* pMsg);
void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen);
void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg);
char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len);
SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len);
void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg);
void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg);
SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg);
char* syncRequestVote2Str(const SyncRequestVote* pMsg);
// for debug ----------------------
void syncRequestVotePrint(const SyncRequestVote* pMsg);
void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg);
void syncRequestVoteLog(const SyncRequestVote* pMsg);
void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg);
// ---------------------------------------------
typedef struct SyncRequestVoteReply {
......@@ -178,13 +229,23 @@ typedef struct SyncRequestVoteReply {
bool voteGranted;
} SyncRequestVoteReply;
SyncRequestVoteReply* SyncRequestVoteReplyBuild();
SyncRequestVoteReply* syncRequestVoteReplyBuild();
void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg);
void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen);
void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg);
char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len);
SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len);
void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg);
void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg);
SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg);
char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg);
// for debug ----------------------
void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg);
void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg);
// ---------------------------------------------
typedef struct SyncAppendEntries {
......@@ -200,17 +261,23 @@ typedef struct SyncAppendEntries {
char data[];
} SyncAppendEntries;
#define SYNC_APPEND_ENTRIES_FIX_LEN \
(sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(SyncIndex) + sizeof(SyncTerm) + \
sizeof(SyncIndex) + sizeof(uint32_t))
SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen);
void syncAppendEntriesDestroy(SyncAppendEntries* pMsg);
void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32_t bufLen);
void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg);
char* syncAppendEntriesSerialize2(const SyncAppendEntries* pMsg, uint32_t* len);
SyncAppendEntries* syncAppendEntriesDeserialize2(const char* buf, uint32_t len);
void syncAppendEntries2RpcMsg(const SyncAppendEntries* pMsg, SRpcMsg* pRpcMsg);
void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg);
SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg);
char* syncAppendEntries2Str(const SyncAppendEntries* pMsg);
// for debug ----------------------
void syncAppendEntriesPrint(const SyncAppendEntries* pMsg);
void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg);
void syncAppendEntriesLog(const SyncAppendEntries* pMsg);
void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg);
// ---------------------------------------------
typedef struct SyncAppendEntriesReply {
......@@ -227,9 +294,19 @@ SyncAppendEntriesReply* syncAppendEntriesReplyBuild();
void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen);
void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg);
char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len);
SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len);
void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg);
void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg);
SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg);
char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg);
// for debug ----------------------
void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg);
#ifdef __cplusplus
}
......
......@@ -46,8 +46,12 @@ char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len);
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len);
cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry);
char* syncEntry2Str(const SSyncRaftEntry* pEntry);
void syncEntryPrint(const SSyncRaftEntry* pEntry);
void syncEntryPrint2(char *s, const SSyncRaftEntry* pEntry);
// for debug ----------------------
void syncEntryPrint(const SSyncRaftEntry* pObj);
void syncEntryPrint2(char* s, const SSyncRaftEntry* pObj);
void syncEntryLog(const SSyncRaftEntry* pObj);
void syncEntryLog2(char* s, const SSyncRaftEntry* pObj);
#ifdef __cplusplus
}
......
......@@ -32,39 +32,24 @@ typedef struct SSyncLogStoreData {
SWal* pWal;
} SSyncLogStoreData;
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode);
void logStoreDestory(SSyncLogStore* pLogStore);
// append one log entry
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
// get one log entry, user need to free pEntry->pCont
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode);
void logStoreDestory(SSyncLogStore* pLogStore);
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index);
// truncate log with index, entries after the given index (>=index) will be deleted
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex);
// return index of last entry
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore);
// return term of last entry
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore);
// update log store commit index with "index"
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
// return commit index of log
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex);
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore);
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore);
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore);
cJSON* logStore2Json(SSyncLogStore* pLogStore);
char* logStore2Str(SSyncLogStore* pLogStore);
cJSON* logStore2Json(SSyncLogStore* pLogStore);
char* logStore2Str(SSyncLogStore* pLogStore);
// for debug
void logStorePrint(SSyncLogStore* pLogStore);
void logStorePrint2(char* s, SSyncLogStore* pLogStore);
void logStoreLog(SSyncLogStore* pLogStore);
void logStoreLog2(char* s, SSyncLogStore* pLogStore);
#ifdef __cplusplus
}
......
......@@ -42,7 +42,12 @@ int32_t raftStoreClose(SRaftStore *pRaftStore);
int32_t raftStorePersist(SRaftStore *pRaftStore);
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len);
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len);
void raftStorePrint(SRaftStore *pRaftStore);
// for debug -------------------
void raftStorePrint(SRaftStore *pObj);
void raftStorePrint2(char *s, SRaftStore *pObj);
void raftStoreLog(SRaftStore *pObj);
void raftStoreLog2(char *s, SRaftStore *pObj);
#ifdef __cplusplus
}
......
......@@ -48,6 +48,12 @@ void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term);
cJSON * voteGranted2Json(SVotesGranted *pVotesGranted);
char * voteGranted2Str(SVotesGranted *pVotesGranted);
// for debug -------------------
void voteGrantedPrint(SVotesGranted *pObj);
void voteGrantedPrint2(char *s, SVotesGranted *pObj);
void voteGrantedLog(SVotesGranted *pObj);
void voteGrantedLog2(char *s, SVotesGranted *pObj);
// SVotesRespond -----------------------------
typedef struct SVotesRespond {
SRaftId (*replicas)[TSDB_MAX_REPLICA];
......@@ -65,6 +71,12 @@ void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term);
cJSON * votesRespond2Json(SVotesRespond *pVotesRespond);
char * votesRespond2Str(SVotesRespond *pVotesRespond);
// for debug -------------------
void votesRespondPrint(SVotesRespond *pObj);
void votesRespondPrint2(char *s, SVotesRespond *pObj);
void votesRespondLog(SVotesRespond *pObj);
void votesRespondLog2(char *s, SVotesRespond *pObj);
#ifdef __cplusplus
}
#endif
......
......@@ -263,7 +263,7 @@ static void *syncIOConsumerFunc(void *param) {
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) {
if (io->FpOnSyncRequestVoteReply != NULL) {
SyncRequestVoteReply *pSyncMsg;
pSyncMsg = SyncRequestVoteReplyBuild();
pSyncMsg = syncRequestVoteReplyBuild();
syncRequestVoteReplyFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
......
......@@ -97,4 +97,31 @@ char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug -------------------
void syncIndexMgrPrint(SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
printf("syncIndexMgrPrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
printf("syncIndexMgrPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void syncIndexMgrLog(SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
sTrace("syncIndexMgrLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
sTrace("syncIndexMgrLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
\ No newline at end of file
......@@ -327,11 +327,31 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
return serialized;
}
void syncNodePrint(char* s, const SSyncNode* pSyncNode) {
char* ss = syncNode2Str(pSyncNode);
// sTrace("syncNodePrint: %s [len:%lu]| %s", s, strlen(ss), ss);
fprintf(stderr, "syncNodePrint: %s [len:%lu]| %s", s, strlen(ss), ss);
free(ss);
// for debug --------------
void syncNodePrint(SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
printf("syncNodePrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void syncNodePrint2(char* s, SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
printf("syncNodePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void syncNodeLog(SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
sTrace("syncNodeLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void syncNodeLog2(char* s, SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
sTrace("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
......
此差异已折叠。
......@@ -104,14 +104,29 @@ char* syncEntry2Str(const SSyncRaftEntry* pEntry) {
return serialized;
}
void syncEntryPrint(const SSyncRaftEntry* pEntry) {
char* s = syncEntry2Str(pEntry);
sTrace("%s", s);
free(s);
// for debug ----------------------
void syncEntryPrint(const SSyncRaftEntry* pObj) {
char* serialized = syncEntry2Str(pObj);
printf("syncEntryPrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void syncEntryPrint2(char* s, const SSyncRaftEntry* pObj) {
char* serialized = syncEntry2Str(pObj);
printf("syncEntryPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void syncEntryLog(const SSyncRaftEntry* pObj) {
char* serialized = syncEntry2Str(pObj);
sTrace("syncEntryLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void syncEntryPrint2(char* s, const SSyncRaftEntry* pEntry) {
char* ss = syncEntry2Str(pEntry);
sTrace("%s | %s", s, ss);
free(ss);
void syncEntryLog2(char* s, const SSyncRaftEntry* pObj) {
char* serialized = syncEntry2Str(pObj);
sTrace("syncEntryLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
\ No newline at end of file
......@@ -43,7 +43,6 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
}
}
// append one log entry
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
......@@ -61,7 +60,6 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
free(serialized);
}
// get one log entry, user need to free pEntry->pCont
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
......@@ -77,14 +75,12 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
return pEntry;
}
// truncate log with index, entries after the given index (>=index) will be deleted
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
walRollback(pWal, fromIndex);
}
// return index of last entry
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
......@@ -92,7 +88,6 @@ SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
return lastIndex;
}
// return term of last entry
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
SyncTerm lastTerm = 0;
SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore);
......@@ -103,14 +98,12 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
return lastTerm;
}
// update log store commit index with "index"
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
walCommit(pWal, index);
}
// return commit index of log
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
return pData->pSyncNode->commitIndex;
......@@ -163,11 +156,29 @@ char* logStore2Str(SSyncLogStore* pLogStore) {
return serialized;
}
// for debug
// for debug -----------------
void logStorePrint(SSyncLogStore* pLogStore) {
char* s = logStore2Str(pLogStore);
// sTrace("%s", s);
fprintf(stderr, "logStorePrint: [len:%lu]| %s \n", strlen(s), s);
char* serialized = logStore2Str(pLogStore);
printf("logStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void logStorePrint2(char* s, SSyncLogStore* pLogStore) {
char* serialized = logStore2Str(pLogStore);
printf("logStorePrint | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
free(s);
void logStoreLog(SSyncLogStore* pLogStore) {
char* serialized = logStore2Str(pLogStore);
sTrace("logStorePrint | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void logStoreLog2(char* s, SSyncLogStore* pLogStore) {
char* serialized = logStore2Str(pLogStore);
sTrace("logStorePrint | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
\ No newline at end of file
......@@ -135,8 +135,30 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
return 0;
}
void raftStorePrint(SRaftStore *pRaftStore) {
char storeBuf[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf));
printf("%s\n", storeBuf);
// for debug -------------------
void raftStorePrint(SRaftStore *pObj) {
char serialized[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pObj, serialized, sizeof(serialized));
printf("raftStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
}
void raftStorePrint2(char *s, SRaftStore *pObj) {
char serialized[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pObj, serialized, sizeof(serialized));
printf("raftStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
}
void raftStoreLog(SRaftStore *pObj) {
char serialized[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pObj, serialized, sizeof(serialized));
sTrace("raftStoreLog | len:%lu | %s", strlen(serialized), serialized);
fflush(NULL);
}
void raftStoreLog2(char *s, SRaftStore *pObj) {
char serialized[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pObj, serialized, sizeof(serialized));
sTrace("raftStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
fflush(NULL);
}
......@@ -119,6 +119,33 @@ char *voteGranted2Str(SVotesGranted *pVotesGranted) {
return serialized;
}
// for debug -------------------
void voteGrantedPrint(SVotesGranted *pObj) {
char *serialized = voteGranted2Str(pObj);
printf("voteGrantedPrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void voteGrantedPrint2(char *s, SVotesGranted *pObj) {
char *serialized = voteGranted2Str(pObj);
printf("voteGrantedPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void voteGrantedLog(SVotesGranted *pObj) {
char *serialized = voteGranted2Str(pObj);
sTrace("voteGrantedLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void voteGrantedLog2(char *s, SVotesGranted *pObj) {
char *serialized = voteGranted2Str(pObj);
sTrace("voteGrantedLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
// SVotesRespond -----------------------------
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) {
SVotesRespond *pVotesRespond = malloc(sizeof(SVotesRespond));
......@@ -210,4 +237,31 @@ char *votesRespond2Str(SVotesRespond *pVotesRespond) {
char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
// for debug -------------------
void votesRespondPrint(SVotesRespond *pObj) {
char *serialized = votesRespond2Str(pObj);
printf("votesRespondPrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void votesRespondPrint2(char *s, SVotesRespond *pObj) {
char *serialized = votesRespond2Str(pObj);
printf("votesRespondPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void votesRespondLog(SVotesRespond *pObj) {
char *serialized = votesRespond2Str(pObj);
sTrace("votesRespondLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void votesRespondLog2(char *s, SVotesRespond *pObj) {
char *serialized = votesRespond2Str(pObj);
sTrace("votesRespondLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
\ No newline at end of file
add_executable(syncTest "")
add_executable(syncEnvTest "")
add_executable(syncPingTest "")
add_executable(syncEncodeTest "")
add_executable(syncPingTimerTest "")
add_executable(syncIOTickQTest "")
add_executable(syncIOTickPingTest "")
add_executable(syncIOSendMsgTest "")
......@@ -17,6 +16,15 @@ add_executable(syncVotesRespondTest "")
add_executable(syncIndexMgrTest "")
add_executable(syncLogStoreTest "")
add_executable(syncEntryTest "")
add_executable(syncRequestVoteTest "")
add_executable(syncRequestVoteReplyTest "")
add_executable(syncAppendEntriesTest "")
add_executable(syncAppendEntriesReplyTest "")
add_executable(syncClientRequestTest "")
add_executable(syncTimeoutTest "")
add_executable(syncPingTest "")
add_executable(syncPingReplyTest "")
add_executable(syncRpcMsgTest "")
target_sources(syncTest
......@@ -27,13 +35,9 @@ target_sources(syncEnvTest
PRIVATE
"syncEnvTest.cpp"
)
target_sources(syncPingTest
target_sources(syncPingTimerTest
PRIVATE
"syncPingTest.cpp"
)
target_sources(syncEncodeTest
PRIVATE
"syncEncodeTest.cpp"
"syncPingTimerTest.cpp"
)
target_sources(syncIOTickQTest
PRIVATE
......@@ -95,6 +99,42 @@ target_sources(syncEntryTest
PRIVATE
"syncEntryTest.cpp"
)
target_sources(syncRequestVoteTest
PRIVATE
"syncRequestVoteTest.cpp"
)
target_sources(syncRequestVoteReplyTest
PRIVATE
"syncRequestVoteReplyTest.cpp"
)
target_sources(syncAppendEntriesTest
PRIVATE
"syncAppendEntriesTest.cpp"
)
target_sources(syncAppendEntriesReplyTest
PRIVATE
"syncAppendEntriesReplyTest.cpp"
)
target_sources(syncClientRequestTest
PRIVATE
"syncClientRequestTest.cpp"
)
target_sources(syncTimeoutTest
PRIVATE
"syncTimeoutTest.cpp"
)
target_sources(syncPingTest
PRIVATE
"syncPingTest.cpp"
)
target_sources(syncPingReplyTest
PRIVATE
"syncPingReplyTest.cpp"
)
target_sources(syncRpcMsgTest
PRIVATE
"syncRpcMsgTest.cpp"
)
target_include_directories(syncTest
......@@ -107,12 +147,7 @@ target_include_directories(syncEnvTest
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncPingTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncEncodeTest
target_include_directories(syncPingTimerTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
......@@ -192,6 +227,51 @@ target_include_directories(syncEntryTest
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncRequestVoteTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncRequestVoteReplyTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncAppendEntriesTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncAppendEntriesReplyTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncClientRequestTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncTimeoutTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncPingTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncPingReplyTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_include_directories(syncRpcMsgTest
PUBLIC
"${CMAKE_SOURCE_DIR}/include/libs/sync"
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
)
target_link_libraries(syncTest
......@@ -202,11 +282,7 @@ target_link_libraries(syncEnvTest
sync
gtest_main
)
target_link_libraries(syncPingTest
sync
gtest_main
)
target_link_libraries(syncEncodeTest
target_link_libraries(syncPingTimerTest
sync
gtest_main
)
......@@ -270,6 +346,42 @@ target_link_libraries(syncEntryTest
sync
gtest_main
)
target_link_libraries(syncRequestVoteTest
sync
gtest_main
)
target_link_libraries(syncRequestVoteReplyTest
sync
gtest_main
)
target_link_libraries(syncAppendEntriesTest
sync
gtest_main
)
target_link_libraries(syncAppendEntriesReplyTest
sync
gtest_main
)
target_link_libraries(syncClientRequestTest
sync
gtest_main
)
target_link_libraries(syncTimeoutTest
sync
gtest_main
)
target_link_libraries(syncPingTest
sync
gtest_main
)
target_link_libraries(syncPingReplyTest
sync
gtest_main
)
target_link_libraries(syncRpcMsgTest
sync
gtest_main
)
enable_testing()
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
SyncAppendEntriesReply *createMsg() {
SyncAppendEntriesReply *pMsg = syncAppendEntriesReplyBuild();
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->success = true;
pMsg->matchIndex = 77;
return pMsg;
}
void test1() {
SyncAppendEntriesReply *pMsg = createMsg();
syncAppendEntriesReplyPrint2((char *)"test1:", pMsg);
syncAppendEntriesReplyDestroy(pMsg);
}
void test2() {
SyncAppendEntriesReply *pMsg = createMsg();
uint32_t len = pMsg->bytes;
char * serialized = (char *)malloc(len);
syncAppendEntriesReplySerialize(pMsg, serialized, len);
SyncAppendEntriesReply *pMsg2 = syncAppendEntriesReplyBuild();
syncAppendEntriesReplyDeserialize(serialized, len, pMsg2);
syncAppendEntriesReplyPrint2((char *)"test2: syncAppendEntriesReplySerialize -> syncAppendEntriesReplyDeserialize ",
pMsg2);
free(serialized);
syncAppendEntriesReplyDestroy(pMsg);
syncAppendEntriesReplyDestroy(pMsg2);
}
void test3() {
SyncAppendEntriesReply *pMsg = createMsg();
uint32_t len;
char * serialized = syncAppendEntriesReplySerialize2(pMsg, &len);
SyncAppendEntriesReply *pMsg2 = syncAppendEntriesReplyDeserialize2(serialized, len);
syncAppendEntriesReplyPrint2((char *)"test3: syncAppendEntriesReplySerialize3 -> syncAppendEntriesReplyDeserialize2 ",
pMsg2);
free(serialized);
syncAppendEntriesReplyDestroy(pMsg);
syncAppendEntriesReplyDestroy(pMsg2);
}
void test4() {
SyncAppendEntriesReply *pMsg = createMsg();
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pMsg, &rpcMsg);
SyncAppendEntriesReply *pMsg2 = syncAppendEntriesReplyBuild();
syncAppendEntriesReplyFromRpcMsg(&rpcMsg, pMsg2);
syncAppendEntriesReplyPrint2((char *)"test4: syncAppendEntriesReply2RpcMsg -> syncAppendEntriesReplyFromRpcMsg ",
pMsg2);
syncAppendEntriesReplyDestroy(pMsg);
syncAppendEntriesReplyDestroy(pMsg2);
}
void test5() {
SyncAppendEntriesReply *pMsg = createMsg();
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pMsg, &rpcMsg);
SyncAppendEntriesReply *pMsg2 = syncAppendEntriesReplyFromRpcMsg2(&rpcMsg);
syncAppendEntriesReplyPrint2((char *)"test5: syncAppendEntriesReply2RpcMsg -> syncAppendEntriesReplyFromRpcMsg2 ",
pMsg2);
syncAppendEntriesReplyDestroy(pMsg);
syncAppendEntriesReplyDestroy(pMsg2);
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
test1();
test2();
test3();
test4();
test5();
return 0;
}
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
SyncAppendEntries *createMsg() {
SyncAppendEntries *pMsg = syncAppendEntriesBuild(20);
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->prevLogIndex = 11;
pMsg->prevLogTerm = 22;
pMsg->commitIndex = 33;
strcpy(pMsg->data, "hello world");
return pMsg;
}
void test1() {
SyncAppendEntries *pMsg = createMsg();
syncAppendEntriesPrint2((char *)"test1:", pMsg);
syncAppendEntriesDestroy(pMsg);
}
void test2() {
SyncAppendEntries *pMsg = createMsg();
uint32_t len = pMsg->bytes;
char * serialized = (char *)malloc(len);
syncAppendEntriesSerialize(pMsg, serialized, len);
SyncAppendEntries *pMsg2 = syncAppendEntriesBuild(pMsg->dataLen);
syncAppendEntriesDeserialize(serialized, len, pMsg2);
syncAppendEntriesPrint2((char *)"test2: syncAppendEntriesSerialize -> syncAppendEntriesDeserialize ", pMsg2);
free(serialized);
syncAppendEntriesDestroy(pMsg);
syncAppendEntriesDestroy(pMsg2);
}
void test3() {
SyncAppendEntries *pMsg = createMsg();
uint32_t len;
char * serialized = syncAppendEntriesSerialize2(pMsg, &len);
SyncAppendEntries *pMsg2 = syncAppendEntriesDeserialize2(serialized, len);
syncAppendEntriesPrint2((char *)"test3: syncAppendEntriesSerialize3 -> syncAppendEntriesDeserialize2 ", pMsg2);
free(serialized);
syncAppendEntriesDestroy(pMsg);
syncAppendEntriesDestroy(pMsg2);
}
void test4() {
SyncAppendEntries *pMsg = createMsg();
SRpcMsg rpcMsg;
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
SyncAppendEntries *pMsg2 = (SyncAppendEntries *)malloc(rpcMsg.contLen);
syncAppendEntriesFromRpcMsg(&rpcMsg, pMsg2);
syncAppendEntriesPrint2((char *)"test4: syncAppendEntries2RpcMsg -> syncAppendEntriesFromRpcMsg ", pMsg2);
syncAppendEntriesDestroy(pMsg);
syncAppendEntriesDestroy(pMsg2);
}
void test5() {
SyncAppendEntries *pMsg = createMsg();
SRpcMsg rpcMsg;
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
SyncAppendEntries *pMsg2 = syncAppendEntriesFromRpcMsg2(&rpcMsg);
syncAppendEntriesPrint2((char *)"test5: syncAppendEntries2RpcMsg -> syncAppendEntriesFromRpcMsg2 ", pMsg2);
syncAppendEntriesDestroy(pMsg);
syncAppendEntriesDestroy(pMsg2);
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
test1();
test2();
test3();
test4();
test5();
return 0;
}
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
SyncClientRequest *createMsg() {
SRpcMsg rpcMsg;
memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.msgType = 12345;
rpcMsg.contLen = 20;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
strcpy((char *)rpcMsg.pCont, "hello rpc");
SyncClientRequest *pMsg = syncClientRequestBuild2(&rpcMsg, 123, true);
return pMsg;
}
void test1() {
SyncClientRequest *pMsg = createMsg();
syncClientRequestPrint2((char *)"test1:", pMsg);
syncClientRequestDestroy(pMsg);
}
void test2() {
SyncClientRequest *pMsg = createMsg();
uint32_t len = pMsg->bytes;
char * serialized = (char *)malloc(len);
syncClientRequestSerialize(pMsg, serialized, len);
SyncClientRequest *pMsg2 = syncClientRequestBuild(pMsg->dataLen);
syncClientRequestDeserialize(serialized, len, pMsg2);
syncClientRequestPrint2((char *)"test2: syncClientRequestSerialize -> syncClientRequestDeserialize ", pMsg2);
free(serialized);
syncClientRequestDestroy(pMsg);
syncClientRequestDestroy(pMsg2);
}
void test3() {
SyncClientRequest *pMsg = createMsg();
uint32_t len;
char * serialized = syncClientRequestSerialize2(pMsg, &len);
SyncClientRequest *pMsg2 = syncClientRequestDeserialize2(serialized, len);
syncClientRequestPrint2((char *)"test3: syncClientRequestSerialize3 -> syncClientRequestDeserialize2 ", pMsg2);
free(serialized);
syncClientRequestDestroy(pMsg);
syncClientRequestDestroy(pMsg2);
}
void test4() {
SyncClientRequest *pMsg = createMsg();
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pMsg, &rpcMsg);
SyncClientRequest *pMsg2 = (SyncClientRequest *)malloc(rpcMsg.contLen);
syncClientRequestFromRpcMsg(&rpcMsg, pMsg2);
syncClientRequestPrint2((char *)"test4: syncClientRequest2RpcMsg -> syncClientRequestFromRpcMsg ", pMsg2);
syncClientRequestDestroy(pMsg);
syncClientRequestDestroy(pMsg2);
}
void test5() {
SyncClientRequest *pMsg = createMsg();
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pMsg, &rpcMsg);
SyncClientRequest *pMsg2 = syncClientRequestFromRpcMsg2(&rpcMsg);
syncClientRequestPrint2((char *)"test5: syncClientRequest2RpcMsg -> syncClientRequestFromRpcMsg2 ", pMsg2);
syncClientRequestDestroy(pMsg);
syncClientRequestDestroy(pMsg2);
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
test1();
test2();
test3();
test4();
test5();
return 0;
}
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
#define PING_MSG_LEN 20
#define APPEND_ENTRIES_VALUE_LEN 32
void test1() {
sTrace("test1: ---- syncPingSerialize, syncPingDeserialize");
char msg[PING_MSG_LEN];
snprintf(msg, sizeof(msg), "%s", "test ping");
SyncPing* pMsg = syncPingBuild(PING_MSG_LEN);
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222);
pMsg->destId.vgId = 100;
memcpy(pMsg->data, msg, PING_MSG_LEN);
{
cJSON* pJson = syncPing2Json(pMsg);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
uint32_t bufLen = pMsg->bytes;
char* buf = (char*)malloc(bufLen);
syncPingSerialize(pMsg, buf, bufLen);
SyncPing* pMsg2 = (SyncPing*)malloc(pMsg->bytes);
syncPingDeserialize(buf, bufLen, pMsg2);
{
cJSON* pJson = syncPing2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
syncPingDestroy(pMsg);
syncPingDestroy(pMsg2);
free(buf);
}
void test2() {
sTrace("test2: ---- syncPing2RpcMsg, syncPingFromRpcMsg");
char msg[PING_MSG_LEN];
snprintf(msg, sizeof(msg), "%s", "hello raft");
SyncPing* pMsg = syncPingBuild(PING_MSG_LEN);
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 3333);
pMsg->srcId.vgId = 200;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 4444);
pMsg->destId.vgId = 200;
memcpy(pMsg->data, msg, PING_MSG_LEN);
{
cJSON* pJson = syncPing2Json(pMsg);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
SRpcMsg rpcMsg;
syncPing2RpcMsg(pMsg, &rpcMsg);
SyncPing* pMsg2 = (SyncPing*)malloc(pMsg->bytes);
syncPingFromRpcMsg(&rpcMsg, pMsg2);
rpcFreeCont(rpcMsg.pCont);
{
cJSON* pJson = syncPing2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
syncPingDestroy(pMsg);
syncPingDestroy(pMsg2);
}
void test3() {
sTrace("test3: ---- syncPingReplySerialize, syncPingReplyDeserialize");
char msg[PING_MSG_LEN];
snprintf(msg, sizeof(msg), "%s", "test ping");
SyncPingReply* pMsg = syncPingReplyBuild(PING_MSG_LEN);
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 5555);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 6666);
pMsg->destId.vgId = 100;
memcpy(pMsg->data, msg, PING_MSG_LEN);
{
cJSON* pJson = syncPingReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
uint32_t bufLen = pMsg->bytes;
char* buf = (char*)malloc(bufLen);
syncPingReplySerialize(pMsg, buf, bufLen);
SyncPingReply* pMsg2 = (SyncPingReply*)malloc(pMsg->bytes);
syncPingReplyDeserialize(buf, bufLen, pMsg2);
{
cJSON* pJson = syncPingReply2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
syncPingReplyDestroy(pMsg);
syncPingReplyDestroy(pMsg2);
free(buf);
}
void test4() {
sTrace("test4: ---- syncPingReply2RpcMsg, syncPingReplyFromRpcMsg");
char msg[PING_MSG_LEN];
snprintf(msg, sizeof(msg), "%s", "hello raft");
SyncPingReply* pMsg = syncPingReplyBuild(PING_MSG_LEN);
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 7777);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 8888);
pMsg->destId.vgId = 100;
memcpy(pMsg->data, msg, PING_MSG_LEN);
{
cJSON* pJson = syncPingReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pMsg, &rpcMsg);
SyncPingReply* pMsg2 = (SyncPingReply*)malloc(pMsg->bytes);
syncPingReplyFromRpcMsg(&rpcMsg, pMsg2);
rpcFreeCont(rpcMsg.pCont);
{
cJSON* pJson = syncPingReply2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
syncPingReplyDestroy(pMsg);
syncPingReplyDestroy(pMsg2);
}
void test5() {
sTrace("test5: ---- syncRequestVoteSerialize, syncRequestVoteDeserialize");
SyncRequestVote* pMsg = syncRequestVoteBuild();
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("8.8.8.8", 5678);
pMsg->destId.vgId = 100;
pMsg->currentTerm = 20;
pMsg->lastLogIndex = 21;
pMsg->lastLogTerm = 22;
{
cJSON* pJson = syncRequestVote2Json(pMsg);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
uint32_t bufLen = pMsg->bytes;
char* buf = (char*)malloc(bufLen);
syncRequestVoteSerialize(pMsg, buf, bufLen);
SyncRequestVote* pMsg2 = (SyncRequestVote*)malloc(pMsg->bytes);
syncRequestVoteDeserialize(buf, bufLen, pMsg2);
{
cJSON* pJson = syncRequestVote2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
syncRequestVoteDestroy(pMsg);
syncRequestVoteDestroy(pMsg2);
free(buf);
}
void test6() {
sTrace("test6: ---- syncRequestVote2RpcMsg, syncRequestVoteFromRpcMsg");
SyncRequestVote* pMsg = syncRequestVoteBuild();
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("8.8.8.8", 5678);
pMsg->destId.vgId = 100;
pMsg->currentTerm = 20;
pMsg->lastLogIndex = 21;
pMsg->lastLogTerm = 22;
{
cJSON* pJson = syncRequestVote2Json(pMsg);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
SRpcMsg rpcMsg;
syncRequestVote2RpcMsg(pMsg, &rpcMsg);
SyncRequestVote* pMsg2 = (SyncRequestVote*)malloc(pMsg->bytes);
syncRequestVoteFromRpcMsg(&rpcMsg, pMsg2);
rpcFreeCont(rpcMsg.pCont);
{
cJSON* pJson = syncRequestVote2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
syncRequestVoteDestroy(pMsg);
syncRequestVoteDestroy(pMsg2);
}
void test7() {
sTrace("test7: ---- syncRequestVoteReplySerialize, syncRequestVoteReplyDeserialize");
SyncRequestVoteReply* pMsg = SyncRequestVoteReplyBuild();
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("8.8.8.8", 5678);
pMsg->destId.vgId = 100;
pMsg->term = 20;
pMsg->voteGranted = 1;
{
cJSON* pJson = syncRequestVoteReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
uint32_t bufLen = pMsg->bytes;
char* buf = (char*)malloc(bufLen);
syncRequestVoteReplySerialize(pMsg, buf, bufLen);
SyncRequestVoteReply* pMsg2 = (SyncRequestVoteReply*)malloc(pMsg->bytes);
syncRequestVoteReplyDeserialize(buf, bufLen, pMsg2);
{
cJSON* pJson = syncRequestVoteReply2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
syncRequestVoteReplyDestroy(pMsg);
syncRequestVoteReplyDestroy(pMsg2);
free(buf);
}
void test8() {
sTrace("test8: ---- syncRequestVoteReply2RpcMsg, syncRequestVoteReplyFromRpcMsg");
SyncRequestVoteReply* pMsg = SyncRequestVoteReplyBuild();
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("8.8.8.8", 5678);
pMsg->destId.vgId = 100;
pMsg->term = 20;
pMsg->voteGranted = 1;
{
cJSON* pJson = syncRequestVoteReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
SRpcMsg rpcMsg;
syncRequestVoteReply2RpcMsg(pMsg, &rpcMsg);
SyncRequestVoteReply* pMsg2 = (SyncRequestVoteReply*)malloc(pMsg->bytes);
syncRequestVoteReplyFromRpcMsg(&rpcMsg, pMsg2);
rpcFreeCont(rpcMsg.pCont);
{
cJSON* pJson = syncRequestVoteReply2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
syncRequestVoteReplyDestroy(pMsg);
syncRequestVoteReplyDestroy(pMsg2);
}
void test9() {
sTrace("test9: ---- syncAppendEntriesSerialize, syncAppendEntriesDeserialize");
char msg[APPEND_ENTRIES_VALUE_LEN];
snprintf(msg, sizeof(msg), "%s", "test value");
SyncAppendEntries* pMsg = syncAppendEntriesBuild(APPEND_ENTRIES_VALUE_LEN);
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222);
pMsg->destId.vgId = 100;
pMsg->prevLogIndex = 55;
pMsg->prevLogTerm = 66;
pMsg->commitIndex = 77;
memcpy(pMsg->data, msg, APPEND_ENTRIES_VALUE_LEN);
{
cJSON* pJson = syncAppendEntries2Json(pMsg);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
uint32_t bufLen = pMsg->bytes;
char* buf = (char*)malloc(bufLen);
syncAppendEntriesSerialize(pMsg, buf, bufLen);
SyncAppendEntries* pMsg2 = (SyncAppendEntries*)malloc(pMsg->bytes);
syncAppendEntriesDeserialize(buf, bufLen, pMsg2);
{
cJSON* pJson = syncAppendEntries2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
syncAppendEntriesDestroy(pMsg);
syncAppendEntriesDestroy(pMsg2);
free(buf);
}
void test10() {
sTrace("test10: ---- syncAppendEntries2RpcMsg, syncAppendEntriesFromRpcMsg");
char msg[APPEND_ENTRIES_VALUE_LEN];
snprintf(msg, sizeof(msg), "%s", "test value");
SyncAppendEntries* pMsg = syncAppendEntriesBuild(APPEND_ENTRIES_VALUE_LEN);
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222);
pMsg->destId.vgId = 100;
pMsg->prevLogIndex = 55;
pMsg->prevLogTerm = 66;
pMsg->commitIndex = 77;
memcpy(pMsg->data, msg, APPEND_ENTRIES_VALUE_LEN);
{
cJSON* pJson = syncAppendEntries2Json(pMsg);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
SRpcMsg rpcMsg;
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
SyncAppendEntries* pMsg2 = (SyncAppendEntries*)malloc(pMsg->bytes);
syncAppendEntriesFromRpcMsg(&rpcMsg, pMsg2);
rpcFreeCont(rpcMsg.pCont);
{
cJSON* pJson = syncAppendEntries2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
syncAppendEntriesDestroy(pMsg);
syncAppendEntriesDestroy(pMsg2);
}
void test11() {
sTrace("test11: ---- syncAppendEntriesReplySerialize, syncAppendEntriesReplyDeserialize");
SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyBuild();
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222);
pMsg->destId.vgId = 100;
pMsg->success = 1;
pMsg->matchIndex = 23;
{
cJSON* pJson = syncAppendEntriesReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
uint32_t bufLen = pMsg->bytes;
char* buf = (char*)malloc(bufLen);
syncAppendEntriesReplySerialize(pMsg, buf, bufLen);
SyncAppendEntriesReply* pMsg2 = (SyncAppendEntriesReply*)malloc(pMsg->bytes);
syncAppendEntriesReplyDeserialize(buf, bufLen, pMsg2);
{
cJSON* pJson = syncAppendEntriesReply2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
syncAppendEntriesReplyDestroy(pMsg);
syncAppendEntriesReplyDestroy(pMsg2);
free(buf);
}
void test12() {
sTrace("test12: ---- syncAppendEntriesReply2RpcMsg, syncAppendEntriesReplyFromRpcMsg");
SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyBuild();
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1111);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 2222);
pMsg->destId.vgId = 100;
pMsg->success = 1;
pMsg->matchIndex = 23;
{
cJSON* pJson = syncAppendEntriesReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pMsg, &rpcMsg);
SyncAppendEntriesReply* pMsg2 = (SyncAppendEntriesReply*)malloc(pMsg->bytes);
syncAppendEntriesReplyFromRpcMsg(&rpcMsg, pMsg2);
rpcFreeCont(rpcMsg.pCont);
{
cJSON* pJson = syncAppendEntriesReply2Json(pMsg2);
char* serialized = cJSON_Print(pJson);
printf("\n%s\n\n", serialized);
free(serialized);
cJSON_Delete(pJson);
}
syncAppendEntriesReplyDestroy(pMsg);
syncAppendEntriesReplyDestroy(pMsg2);
}
int main() {
// taosInitLog((char*)"syncPingTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
test1();
test2();
test3();
test4();
test5();
test6();
test7();
test8();
test9();
test10();
test11();
test12();
return 0;
}
......@@ -89,7 +89,7 @@ int main(int argc, char** argv) {
SSyncNode* pSyncNode = syncInitTest();
assert(pSyncNode != NULL);
syncNodePrint((char*)"syncInitTest", pSyncNode);
syncNodePrint2((char*)"syncInitTest", pSyncNode);
initRaftId(pSyncNode);
......
......@@ -83,7 +83,7 @@ SSyncNode* syncInitTest() { return syncNodeInit(); }
void logStoreTest() {
logStorePrint(pSyncNode->pLogStore);
for (int i = 0; i < 5; ++i) {
int32_t dataLen = 10;
int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
assert(pEntry != NULL);
pEntry->msgType = 1;
......@@ -94,7 +94,7 @@ void logStoreTest() {
pEntry->index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
snprintf(pEntry->data, dataLen, "value%d", i);
//syncEntryPrint2((char*)"write entry:", pEntry);
// syncEntryPrint2((char*)"write entry:", pEntry);
pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry);
syncEntryDestory(pEntry);
}
......@@ -132,8 +132,8 @@ int main(int argc, char** argv) {
pSyncNode = syncInitTest();
assert(pSyncNode != NULL);
//syncNodePrint((char*)"syncLogStoreTest", pSyncNode);
//initRaftId(pSyncNode);
// syncNodePrint((char*)"syncLogStoreTest", pSyncNode);
// initRaftId(pSyncNode);
logStoreTest();
......
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
SyncPingReply *createMsg() {
SRaftId srcId, destId;
srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
srcId.vgId = 100;
destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
destId.vgId = 100;
SyncPingReply *pMsg = syncPingReplyBuild3(&srcId, &destId);
return pMsg;
}
void test1() {
SyncPingReply *pMsg = createMsg();
syncPingReplyPrint2((char *)"test1:", pMsg);
syncPingReplyDestroy(pMsg);
}
void test2() {
SyncPingReply *pMsg = createMsg();
uint32_t len = pMsg->bytes;
char * serialized = (char *)malloc(len);
syncPingReplySerialize(pMsg, serialized, len);
SyncPingReply *pMsg2 = syncPingReplyBuild(pMsg->dataLen);
syncPingReplyDeserialize(serialized, len, pMsg2);
syncPingReplyPrint2((char *)"test2: syncPingReplySerialize -> syncPingReplyDeserialize ", pMsg2);
free(serialized);
syncPingReplyDestroy(pMsg);
syncPingReplyDestroy(pMsg2);
}
void test3() {
SyncPingReply *pMsg = createMsg();
uint32_t len;
char * serialized = syncPingReplySerialize2(pMsg, &len);
SyncPingReply *pMsg2 = syncPingReplyDeserialize2(serialized, len);
syncPingReplyPrint2((char *)"test3: syncPingReplySerialize3 -> syncPingReplyDeserialize2 ", pMsg2);
free(serialized);
syncPingReplyDestroy(pMsg);
syncPingReplyDestroy(pMsg2);
}
void test4() {
SyncPingReply *pMsg = createMsg();
SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pMsg, &rpcMsg);
SyncPingReply *pMsg2 = (SyncPingReply *)malloc(rpcMsg.contLen);
syncPingReplyFromRpcMsg(&rpcMsg, pMsg2);
syncPingReplyPrint2((char *)"test4: syncPingReply2RpcMsg -> syncPingReplyFromRpcMsg ", pMsg2);
syncPingReplyDestroy(pMsg);
syncPingReplyDestroy(pMsg2);
}
void test5() {
SyncPingReply *pMsg = createMsg();
SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pMsg, &rpcMsg);
SyncPingReply *pMsg2 = syncPingReplyFromRpcMsg2(&rpcMsg);
syncPingReplyPrint2((char *)"test5: syncPingReply2RpcMsg -> syncPingReplyFromRpcMsg2 ", pMsg2);
syncPingReplyDestroy(pMsg);
syncPingReplyDestroy(pMsg2);
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
test1();
test2();
test3();
test4();
test5();
return 0;
}
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncMessage.h"
#include "syncUtil.h"
void logTest() {
......@@ -15,116 +14,82 @@ void logTest() {
sFatal("--- sync log test: fatal");
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 3;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
SSyncFSM* pFsm;
SyncPing *createMsg() {
SRaftId srcId, destId;
srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
srcId.vgId = 100;
destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
destId.vgId = 100;
SyncPing *pMsg = syncPingBuild3(&srcId, &destId);
return pMsg;
}
SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
void test1() {
SyncPing *pMsg = createMsg();
syncPingPrint2((char *)"test1:", pMsg);
syncPingDestroy(pMsg);
}
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
void test2() {
SyncPing *pMsg = createMsg();
uint32_t len = pMsg->bytes;
char * serialized = (char *)malloc(len);
syncPingSerialize(pMsg, serialized, len);
SyncPing *pMsg2 = syncPingBuild(pMsg->dataLen);
syncPingDeserialize(serialized, len, pMsg2);
syncPingPrint2((char *)"test2: syncPingSerialize -> syncPingDeserialize ", pMsg2);
free(serialized);
syncPingDestroy(pMsg);
syncPingDestroy(pMsg2);
}
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = ports[i];
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
void test3() {
SyncPing *pMsg = createMsg();
uint32_t len;
char * serialized = syncPingSerialize2(pMsg, &len);
SyncPing *pMsg2 = syncPingDeserialize2(serialized, len);
syncPingPrint2((char *)"test3: syncPingSerialize3 -> syncPingDeserialize2 ", pMsg2);
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL);
free(serialized);
syncPingDestroy(pMsg);
syncPingDestroy(pMsg2);
}
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->pSyncNode = pSyncNode;
void test4() {
SyncPing *pMsg = createMsg();
SRpcMsg rpcMsg;
syncPing2RpcMsg(pMsg, &rpcMsg);
SyncPing *pMsg2 = (SyncPing *)malloc(rpcMsg.contLen);
syncPingFromRpcMsg(&rpcMsg, pMsg2);
syncPingPrint2((char *)"test4: syncPing2RpcMsg -> syncPingFromRpcMsg ", pMsg2);
return pSyncNode;
syncPingDestroy(pMsg);
syncPingDestroy(pMsg2);
}
SSyncNode* syncInitTest() { return syncNodeInit(); }
void test5() {
SyncPing *pMsg = createMsg();
SRpcMsg rpcMsg;
syncPing2RpcMsg(pMsg, &rpcMsg);
SyncPing *pMsg2 = syncPingFromRpcMsg2(&rpcMsg);
syncPingPrint2((char *)"test5: syncPing2RpcMsg -> syncPingFromRpcMsg2 ", pMsg2);
void initRaftId(SSyncNode* pSyncNode) {
for (int i = 0; i < replicaNum; ++i) {
ids[i] = pSyncNode->replicasId[i];
char* s = syncUtilRaftId2Str(&ids[i]);
printf("raftId[%d] : %s\n", i, s);
free(s);
}
syncPingDestroy(pMsg);
syncPingDestroy(pMsg2);
}
int main(int argc, char** argv) {
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
}
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
ret = syncEnvStart();
assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest();
assert(pSyncNode != NULL);
syncNodePrint((char*)"----1", pSyncNode);
initRaftId(pSyncNode);
//---------------------------
sTrace("syncNodeStartPingTimer ...");
ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
syncNodePrint((char*)"----2", pSyncNode);
sTrace("sleep ...");
taosMsleep(10000);
sTrace("syncNodeStopPingTimer ...");
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
syncNodePrint((char*)"----3", pSyncNode);
sTrace("sleep ...");
taosMsleep(5000);
sTrace("syncNodeStartPingTimer ...");
ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
syncNodePrint((char*)"----4", pSyncNode);
sTrace("sleep ...");
taosMsleep(10000);
sTrace("syncNodeStopPingTimer ...");
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
syncNodePrint((char*)"----5", pSyncNode);
while (1) {
sTrace("while 1 sleep ...");
taosMsleep(1000);
}
test1();
test2();
test3();
test4();
test5();
return 0;
}
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 3;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
SSyncFSM* pFsm;
SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = ports[i];
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
gSyncIO->pSyncNode = pSyncNode;
return pSyncNode;
}
SSyncNode* syncInitTest() { return syncNodeInit(); }
void initRaftId(SSyncNode* pSyncNode) {
for (int i = 0; i < replicaNum; ++i) {
ids[i] = pSyncNode->replicasId[i];
char* s = syncUtilRaftId2Str(&ids[i]);
printf("raftId[%d] : %s\n", i, s);
free(s);
}
}
int main(int argc, char** argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
}
int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]);
assert(ret == 0);
ret = syncEnvStart();
assert(ret == 0);
SSyncNode* pSyncNode = syncInitTest();
assert(pSyncNode != NULL);
syncNodePrint2((char*)"----1", pSyncNode);
initRaftId(pSyncNode);
//---------------------------
sTrace("syncNodeStartPingTimer ...");
ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
syncNodePrint2((char*)"----2", pSyncNode);
sTrace("sleep ...");
taosMsleep(10000);
sTrace("syncNodeStopPingTimer ...");
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
syncNodePrint2((char*)"----3", pSyncNode);
sTrace("sleep ...");
taosMsleep(5000);
sTrace("syncNodeStartPingTimer ...");
ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
syncNodePrint2((char*)"----4", pSyncNode);
sTrace("sleep ...");
taosMsleep(10000);
sTrace("syncNodeStopPingTimer ...");
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
syncNodePrint2((char*)"----5", pSyncNode);
while (1) {
sTrace("while 1 sleep ...");
taosMsleep(1000);
}
return 0;
}
......@@ -22,15 +22,21 @@ int main() {
SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json");
assert(pRaftStore != NULL);
raftStorePrint(pRaftStore);
#if 0
pRaftStore->currentTerm = 100;
pRaftStore->voteFor.addr = 200;
pRaftStore->voteFor.vgId = 300;
raftStorePersist(pRaftStore);
raftStorePrint(pRaftStore);
#endif
++(pRaftStore->currentTerm);
++(pRaftStore->voteFor.addr);
++(pRaftStore->voteFor.vgId);
raftStorePersist(pRaftStore);
raftStorePrint(pRaftStore);
return 0;
}
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
SyncRequestVoteReply *createMsg() {
SyncRequestVoteReply *pMsg = syncRequestVoteReplyBuild();
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->term = 77;
pMsg->voteGranted = true;
return pMsg;
}
void test1() {
SyncRequestVoteReply *pMsg = createMsg();
syncRequestVoteReplyPrint2((char *)"test1:", pMsg);
syncRequestVoteReplyDestroy(pMsg);
}
void test2() {
SyncRequestVoteReply *pMsg = createMsg();
uint32_t len = pMsg->bytes;
char * serialized = (char *)malloc(len);
syncRequestVoteReplySerialize(pMsg, serialized, len);
SyncRequestVoteReply *pMsg2 = syncRequestVoteReplyBuild();
syncRequestVoteReplyDeserialize(serialized, len, pMsg2);
syncRequestVoteReplyPrint2((char *)"test2: syncRequestVoteReplySerialize -> syncRequestVoteReplyDeserialize ", pMsg2);
free(serialized);
syncRequestVoteReplyDestroy(pMsg);
syncRequestVoteReplyDestroy(pMsg2);
}
void test3() {
SyncRequestVoteReply *pMsg = createMsg();
uint32_t len;
char * serialized = syncRequestVoteReplySerialize2(pMsg, &len);
SyncRequestVoteReply *pMsg2 = syncRequestVoteReplyDeserialize2(serialized, len);
syncRequestVoteReplyPrint2((char *)"test3: syncRequestVoteReplySerialize3 -> syncRequestVoteReplyDeserialize2 ",
pMsg2);
free(serialized);
syncRequestVoteReplyDestroy(pMsg);
syncRequestVoteReplyDestroy(pMsg2);
}
void test4() {
SyncRequestVoteReply *pMsg = createMsg();
SRpcMsg rpcMsg;
syncRequestVoteReply2RpcMsg(pMsg, &rpcMsg);
SyncRequestVoteReply *pMsg2 = syncRequestVoteReplyBuild();
syncRequestVoteReplyFromRpcMsg(&rpcMsg, pMsg2);
syncRequestVoteReplyPrint2((char *)"test4: syncRequestVoteReply2RpcMsg -> syncRequestVoteReplyFromRpcMsg ", pMsg2);
syncRequestVoteReplyDestroy(pMsg);
syncRequestVoteReplyDestroy(pMsg2);
}
void test5() {
SyncRequestVoteReply *pMsg = createMsg();
SRpcMsg rpcMsg;
syncRequestVoteReply2RpcMsg(pMsg, &rpcMsg);
SyncRequestVoteReply *pMsg2 = syncRequestVoteReplyFromRpcMsg2(&rpcMsg);
syncRequestVoteReplyPrint2((char *)"test5: syncRequestVoteReply2RpcMsg -> syncRequestVoteReplyFromRpcMsg2 ", pMsg2);
syncRequestVoteReplyDestroy(pMsg);
syncRequestVoteReplyDestroy(pMsg2);
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
test1();
test2();
test3();
test4();
test5();
return 0;
}
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
SyncRequestVote *createMsg() {
SyncRequestVote *pMsg = syncRequestVoteBuild();
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->currentTerm = 11;
pMsg->lastLogIndex = 22;
pMsg->lastLogTerm = 33;
return pMsg;
}
void test1() {
SyncRequestVote *pMsg = createMsg();
syncRequestVotePrint2((char *)"test1:", pMsg);
syncRequestVoteDestroy(pMsg);
}
void test2() {
SyncRequestVote *pMsg = createMsg();
uint32_t len = pMsg->bytes;
char * serialized = (char *)malloc(len);
syncRequestVoteSerialize(pMsg, serialized, len);
SyncRequestVote *pMsg2 = syncRequestVoteBuild();
syncRequestVoteDeserialize(serialized, len, pMsg2);
syncRequestVotePrint2((char *)"test2: syncRequestVoteSerialize -> syncRequestVoteDeserialize ", pMsg2);
free(serialized);
syncRequestVoteDestroy(pMsg);
syncRequestVoteDestroy(pMsg2);
}
void test3() {
SyncRequestVote *pMsg = createMsg();
uint32_t len;
char * serialized = syncRequestVoteSerialize2(pMsg, &len);
SyncRequestVote *pMsg2 = syncRequestVoteDeserialize2(serialized, len);
syncRequestVotePrint2((char *)"test3: syncRequestVoteSerialize3 -> syncRequestVoteDeserialize2 ", pMsg2);
free(serialized);
syncRequestVoteDestroy(pMsg);
syncRequestVoteDestroy(pMsg2);
}
void test4() {
SyncRequestVote *pMsg = createMsg();
SRpcMsg rpcMsg;
syncRequestVote2RpcMsg(pMsg, &rpcMsg);
SyncRequestVote *pMsg2 = syncRequestVoteBuild();
syncRequestVoteFromRpcMsg(&rpcMsg, pMsg2);
syncRequestVotePrint2((char *)"test4: syncRequestVote2RpcMsg -> syncRequestVoteFromRpcMsg ", pMsg2);
syncRequestVoteDestroy(pMsg);
syncRequestVoteDestroy(pMsg2);
}
void test5() {
SyncRequestVote *pMsg = createMsg();
SRpcMsg rpcMsg;
syncRequestVote2RpcMsg(pMsg, &rpcMsg);
SyncRequestVote *pMsg2 = syncRequestVoteFromRpcMsg2(&rpcMsg);
syncRequestVotePrint2((char *)"test5: syncRequestVote2RpcMsg -> syncRequestVoteFromRpcMsg2 ", pMsg2);
syncRequestVoteDestroy(pMsg);
syncRequestVoteDestroy(pMsg2);
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
test1();
test2();
test3();
test4();
test5();
return 0;
}
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
int gg = 0;
SyncTimeout *createSyncTimeout() {
SyncTimeout *pMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, 999, 333, &gg);
return pMsg;
}
SyncPing *createSyncPing() {
SRaftId srcId, destId;
srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
srcId.vgId = 100;
destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
destId.vgId = 100;
SyncPing *pMsg = syncPingBuild3(&srcId, &destId);
return pMsg;
}
SyncPingReply *createSyncPingReply() {
SRaftId srcId, destId;
srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
srcId.vgId = 100;
destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
destId.vgId = 100;
SyncPingReply *pMsg = syncPingReplyBuild3(&srcId, &destId);
return pMsg;
}
SyncClientRequest *createSyncClientRequest() {
SRpcMsg rpcMsg;
memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.msgType = 12345;
rpcMsg.contLen = 20;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
strcpy((char *)rpcMsg.pCont, "hello rpc");
SyncClientRequest *pMsg = syncClientRequestBuild2(&rpcMsg, 123, true);
return pMsg;
}
SyncRequestVote *createSyncRequestVote() {
SyncRequestVote *pMsg = syncRequestVoteBuild();
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->currentTerm = 11;
pMsg->lastLogIndex = 22;
pMsg->lastLogTerm = 33;
return pMsg;
}
SyncRequestVoteReply *createSyncRequestVoteReply() {
SyncRequestVoteReply *pMsg = syncRequestVoteReplyBuild();
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->term = 77;
pMsg->voteGranted = true;
return pMsg;
}
SyncAppendEntries *createSyncAppendEntries() {
SyncAppendEntries *pMsg = syncAppendEntriesBuild(20);
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->prevLogIndex = 11;
pMsg->prevLogTerm = 22;
pMsg->commitIndex = 33;
strcpy(pMsg->data, "hello world");
return pMsg;
}
SyncAppendEntriesReply *createSyncAppendEntriesReply() {
SyncAppendEntriesReply *pMsg = syncAppendEntriesReplyBuild();
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
pMsg->srcId.vgId = 100;
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
pMsg->destId.vgId = 100;
pMsg->success = true;
pMsg->matchIndex = 77;
return pMsg;
}
void test1() {
SyncTimeout *pMsg = createSyncTimeout();
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test1", &rpcMsg);
syncTimeoutDestroy(pMsg);
}
void test2() {
SyncPing *pMsg = createSyncPing();
SRpcMsg rpcMsg;
syncPing2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test2", &rpcMsg);
syncPingDestroy(pMsg);
}
void test3() {
SyncPingReply *pMsg = createSyncPingReply();
SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test3", &rpcMsg);
syncPingReplyDestroy(pMsg);
}
void test4() {
SyncRequestVote *pMsg = createSyncRequestVote();
SRpcMsg rpcMsg;
syncRequestVote2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test4", &rpcMsg);
syncRequestVoteDestroy(pMsg);
}
void test5() {
SyncRequestVoteReply *pMsg = createSyncRequestVoteReply();
SRpcMsg rpcMsg;
syncRequestVoteReply2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test5", &rpcMsg);
syncRequestVoteReplyDestroy(pMsg);
}
void test6() {
SyncAppendEntries *pMsg = createSyncAppendEntries();
SRpcMsg rpcMsg;
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test6", &rpcMsg);
syncAppendEntriesDestroy(pMsg);
}
void test7() {
SyncAppendEntriesReply *pMsg = createSyncAppendEntriesReply();
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test7", &rpcMsg);
syncAppendEntriesReplyDestroy(pMsg);
}
void test8() {
SyncClientRequest *pMsg = createSyncClientRequest();
SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pMsg, &rpcMsg);
syncRpcMsgPrint2((char *)"test8", &rpcMsg);
syncClientRequestDestroy(pMsg);
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
test1();
test2();
test3();
test4();
test5();
test6();
test7();
test8();
return 0;
}
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
int gg = 0;
SyncTimeout *createMsg() {
SyncTimeout *pMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, 999, 333, &gg);
return pMsg;
}
void test1() {
SyncTimeout *pMsg = createMsg();
syncTimeoutPrint2((char *)"test1:", pMsg);
syncTimeoutDestroy(pMsg);
}
void test2() {
SyncTimeout *pMsg = createMsg();
uint32_t len = pMsg->bytes;
char * serialized = (char *)malloc(len);
syncTimeoutSerialize(pMsg, serialized, len);
SyncTimeout *pMsg2 = syncTimeoutBuild();
syncTimeoutDeserialize(serialized, len, pMsg2);
syncTimeoutPrint2((char *)"test2: syncTimeoutSerialize -> syncTimeoutDeserialize ", pMsg2);
free(serialized);
syncTimeoutDestroy(pMsg);
syncTimeoutDestroy(pMsg2);
}
void test3() {
SyncTimeout *pMsg = createMsg();
uint32_t len;
char * serialized = syncTimeoutSerialize2(pMsg, &len);
SyncTimeout *pMsg2 = syncTimeoutDeserialize2(serialized, len);
syncTimeoutPrint2((char *)"test3: syncTimeoutSerialize3 -> syncTimeoutDeserialize2 ", pMsg2);
free(serialized);
syncTimeoutDestroy(pMsg);
syncTimeoutDestroy(pMsg2);
}
void test4() {
SyncTimeout *pMsg = createMsg();
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pMsg, &rpcMsg);
SyncTimeout *pMsg2 = (SyncTimeout *)malloc(rpcMsg.contLen);
syncTimeoutFromRpcMsg(&rpcMsg, pMsg2);
syncTimeoutPrint2((char *)"test4: syncTimeout2RpcMsg -> syncTimeoutFromRpcMsg ", pMsg2);
syncTimeoutDestroy(pMsg);
syncTimeoutDestroy(pMsg2);
}
void test5() {
SyncTimeout *pMsg = createMsg();
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pMsg, &rpcMsg);
SyncTimeout *pMsg2 = syncTimeoutFromRpcMsg2(&rpcMsg);
syncTimeoutPrint2((char *)"test5: syncTimeout2RpcMsg -> syncTimeoutFromRpcMsg2 ", pMsg2);
syncTimeoutDestroy(pMsg);
syncTimeoutDestroy(pMsg2);
}
int main() {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
logTest();
test1();
test2();
test3();
test4();
test5();
return 0;
}
......@@ -118,7 +118,7 @@ int main(int argc, char** argv) {
}
for (int i = 0; i < replicaNum; ++i) {
SyncRequestVoteReply* reply = SyncRequestVoteReplyBuild();
SyncRequestVoteReply* reply = syncRequestVoteReplyBuild();
reply->destId = pSyncNode->myRaftId;
reply->srcId = ids[i];
reply->term = term;
......
......@@ -118,7 +118,7 @@ int main(int argc, char** argv) {
}
for (int i = 0; i < replicaNum; ++i) {
SyncRequestVoteReply* reply = SyncRequestVoteReplyBuild();
SyncRequestVoteReply* reply = syncRequestVoteReplyBuild();
reply->destId = pSyncNode->myRaftId;
reply->srcId = ids[i];
reply->term = term;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册