提交 88aef2d1 编写于 作者: M Minghao Li

sync refactor

上级 73d768d2
...@@ -42,6 +42,12 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId ...@@ -42,6 +42,12 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId
cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr); cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr);
char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr); char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr);
// for debug -------------------
void syncIndexMgrPrint(SSyncIndexMgr *pObj);
void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj);
void syncIndexMgrLog(SSyncIndexMgr *pObj);
void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -211,11 +211,14 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); ...@@ -211,11 +211,14 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
// for debug char* syncNode2Str(const SSyncNode* pSyncNode);
cJSON* syncNode2Json(const SSyncNode* pSyncNode);
char* syncNode2Str(const SSyncNode* pSyncNode); // for debug --------------
void syncNodePrint(char* s, const SSyncNode* pSyncNode); void syncNodePrint(SSyncNode* pObj);
void syncNodePrint2(char* s, SSyncNode* pObj);
void syncNodeLog(SSyncNode* pObj);
void syncNodeLog2(char* s, SSyncNode* pObj);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -48,10 +48,10 @@ cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry); ...@@ -48,10 +48,10 @@ cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry);
char* syncEntry2Str(const SSyncRaftEntry* pEntry); char* syncEntry2Str(const SSyncRaftEntry* pEntry);
// for debug // for debug
void syncEntryPrint(const SSyncRaftEntry* pEntry); void syncEntryPrint(const SSyncRaftEntry* pObj);
void syncEntryPrint2(char* s, const SSyncRaftEntry* pEntry); void syncEntryPrint2(char* s, const SSyncRaftEntry* pObj);
void syncEntryLog(const SSyncRaftEntry* pEntry); void syncEntryLog(const SSyncRaftEntry* pObj);
void syncEntryLog2(char* s, const SSyncRaftEntry* pEntry); void syncEntryLog2(char* s, const SSyncRaftEntry* pObj);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -42,7 +42,12 @@ int32_t raftStoreClose(SRaftStore *pRaftStore); ...@@ -42,7 +42,12 @@ int32_t raftStoreClose(SRaftStore *pRaftStore);
int32_t raftStorePersist(SRaftStore *pRaftStore); int32_t raftStorePersist(SRaftStore *pRaftStore);
int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len); int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len);
int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len); int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len);
void raftStorePrint(SRaftStore *pRaftStore);
// for debug -------------------
void raftStorePrint(SRaftStore *pObj);
void raftStorePrint2(char *s, SRaftStore *pObj);
void raftStoreLog(SRaftStore *pObj);
void raftStoreLog2(char *s, SRaftStore *pObj);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -48,6 +48,12 @@ void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); ...@@ -48,6 +48,12 @@ void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term);
cJSON * voteGranted2Json(SVotesGranted *pVotesGranted); cJSON * voteGranted2Json(SVotesGranted *pVotesGranted);
char * voteGranted2Str(SVotesGranted *pVotesGranted); char * voteGranted2Str(SVotesGranted *pVotesGranted);
// for debug -------------------
void voteGrantedPrint(SVotesGranted *pObj);
void voteGrantedPrint2(char *s, SVotesGranted *pObj);
void voteGrantedLog(SVotesGranted *pObj);
void voteGrantedLog2(char *s, SVotesGranted *pObj);
// SVotesRespond ----------------------------- // SVotesRespond -----------------------------
typedef struct SVotesRespond { typedef struct SVotesRespond {
SRaftId (*replicas)[TSDB_MAX_REPLICA]; SRaftId (*replicas)[TSDB_MAX_REPLICA];
...@@ -65,6 +71,12 @@ void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term); ...@@ -65,6 +71,12 @@ void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term);
cJSON * votesRespond2Json(SVotesRespond *pVotesRespond); cJSON * votesRespond2Json(SVotesRespond *pVotesRespond);
char * votesRespond2Str(SVotesRespond *pVotesRespond); char * votesRespond2Str(SVotesRespond *pVotesRespond);
// for debug -------------------
void votesRespondPrint(SVotesRespond *pObj);
void votesRespondPrint2(char *s, SVotesRespond *pObj);
void votesRespondLog(SVotesRespond *pObj);
void votesRespondLog2(char *s, SVotesRespond *pObj);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -97,4 +97,31 @@ char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) { ...@@ -97,4 +97,31 @@ char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
char * serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
}
// for debug -------------------
void syncIndexMgrPrint(SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
printf("syncIndexMgrPrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
printf("syncIndexMgrPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void syncIndexMgrLog(SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
sTrace("syncIndexMgrLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) {
char *serialized = syncIndexMgr2Str(pObj);
sTrace("syncIndexMgrLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
} }
\ No newline at end of file
...@@ -327,11 +327,31 @@ char* syncNode2Str(const SSyncNode* pSyncNode) { ...@@ -327,11 +327,31 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
return serialized; return serialized;
} }
void syncNodePrint(char* s, const SSyncNode* pSyncNode) { // for debug --------------
char* ss = syncNode2Str(pSyncNode); void syncNodePrint(SSyncNode* pObj) {
// sTrace("syncNodePrint: %s [len:%lu]| %s", s, strlen(ss), ss); char* serialized = syncNode2Str(pObj);
fprintf(stderr, "syncNodePrint: %s [len:%lu]| %s", s, strlen(ss), ss); printf("syncNodePrint | len:%lu | %s \n", strlen(serialized), serialized);
free(ss); fflush(NULL);
free(serialized);
}
void syncNodePrint2(char* s, SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
printf("syncNodePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void syncNodeLog(SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
sTrace("syncNodeLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void syncNodeLog2(char* s, SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
sTrace("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
} }
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
......
...@@ -105,28 +105,28 @@ char* syncEntry2Str(const SSyncRaftEntry* pEntry) { ...@@ -105,28 +105,28 @@ char* syncEntry2Str(const SSyncRaftEntry* pEntry) {
} }
// for debug ----------- // for debug -----------
void syncEntryPrint(const SSyncRaftEntry* pEntry) { void syncEntryPrint(const SSyncRaftEntry* pObj) {
char* serialized = syncEntry2Str(pEntry); char* serialized = syncEntry2Str(pObj);
printf("syncEntryPrint | len:%lu | %s \n", strlen(serialized), serialized); printf("syncEntryPrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL); fflush(NULL);
free(serialized); free(serialized);
} }
void syncEntryPrint2(char* s, const SSyncRaftEntry* pEntry) { void syncEntryPrint2(char* s, const SSyncRaftEntry* pObj) {
char* serialized = syncEntry2Str(pEntry); char* serialized = syncEntry2Str(pObj);
printf("syncEntryPrint | len:%lu | %s | %s \n", strlen(serialized), s, serialized); printf("syncEntryPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL); fflush(NULL);
free(serialized); free(serialized);
} }
void syncEntryLog(const SSyncRaftEntry* pEntry) { void syncEntryLog(const SSyncRaftEntry* pObj) {
char* serialized = syncEntry2Str(pEntry); char* serialized = syncEntry2Str(pObj);
sTrace("syncEntryPrint | len:%lu | %s", strlen(serialized), serialized); sTrace("syncEntryLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized); free(serialized);
} }
void syncEntryLog2(char* s, const SSyncRaftEntry* pEntry) { void syncEntryLog2(char* s, const SSyncRaftEntry* pObj) {
char* serialized = syncEntry2Str(pEntry); char* serialized = syncEntry2Str(pObj);
sTrace("syncEntryPrint | len:%lu | %s | %s", strlen(serialized), s, serialized); sTrace("syncEntryLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized); free(serialized);
} }
\ No newline at end of file
...@@ -135,8 +135,30 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { ...@@ -135,8 +135,30 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
return 0; return 0;
} }
void raftStorePrint(SRaftStore *pRaftStore) { // for debug -------------------
char storeBuf[RAFT_STORE_BLOCK_SIZE]; void raftStorePrint(SRaftStore *pObj) {
raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); char serialized[RAFT_STORE_BLOCK_SIZE];
printf("%s\n", storeBuf); raftStoreSerialize(pObj, serialized, sizeof(serialized));
printf("raftStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
}
void raftStorePrint2(char *s, SRaftStore *pObj) {
char serialized[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pObj, serialized, sizeof(serialized));
printf("raftStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
}
void raftStoreLog(SRaftStore *pObj) {
char serialized[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pObj, serialized, sizeof(serialized));
sTrace("raftStoreLog | len:%lu | %s", strlen(serialized), serialized);
fflush(NULL);
}
void raftStoreLog2(char *s, SRaftStore *pObj) {
char serialized[RAFT_STORE_BLOCK_SIZE];
raftStoreSerialize(pObj, serialized, sizeof(serialized));
sTrace("raftStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
fflush(NULL);
} }
...@@ -119,6 +119,33 @@ char *voteGranted2Str(SVotesGranted *pVotesGranted) { ...@@ -119,6 +119,33 @@ char *voteGranted2Str(SVotesGranted *pVotesGranted) {
return serialized; return serialized;
} }
// for debug -------------------
void voteGrantedPrint(SVotesGranted *pObj) {
char *serialized = voteGranted2Str(pObj);
printf("voteGrantedPrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void voteGrantedPrint2(char *s, SVotesGranted *pObj) {
char *serialized = voteGranted2Str(pObj);
printf("voteGrantedPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void voteGrantedLog(SVotesGranted *pObj) {
char *serialized = voteGranted2Str(pObj);
sTrace("voteGrantedLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void voteGrantedLog2(char *s, SVotesGranted *pObj) {
char *serialized = voteGranted2Str(pObj);
sTrace("voteGrantedLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
// SVotesRespond ----------------------------- // SVotesRespond -----------------------------
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) { SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) {
SVotesRespond *pVotesRespond = malloc(sizeof(SVotesRespond)); SVotesRespond *pVotesRespond = malloc(sizeof(SVotesRespond));
...@@ -210,4 +237,31 @@ char *votesRespond2Str(SVotesRespond *pVotesRespond) { ...@@ -210,4 +237,31 @@ char *votesRespond2Str(SVotesRespond *pVotesRespond) {
char * serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
}
// for debug -------------------
void votesRespondPrint(SVotesRespond *pObj) {
char *serialized = votesRespond2Str(pObj);
printf("votesRespondPrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void votesRespondPrint2(char *s, SVotesRespond *pObj) {
char *serialized = votesRespond2Str(pObj);
printf("votesRespondPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void votesRespondLog(SVotesRespond *pObj) {
char *serialized = votesRespond2Str(pObj);
sTrace("votesRespondLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void votesRespondLog2(char *s, SVotesRespond *pObj) {
char *serialized = votesRespond2Str(pObj);
sTrace("votesRespondLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
} }
\ No newline at end of file
...@@ -89,7 +89,7 @@ int main(int argc, char** argv) { ...@@ -89,7 +89,7 @@ int main(int argc, char** argv) {
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
syncNodePrint((char*)"syncInitTest", pSyncNode); syncNodePrint2((char*)"syncInitTest", pSyncNode);
initRaftId(pSyncNode); initRaftId(pSyncNode);
......
...@@ -83,7 +83,7 @@ SSyncNode* syncInitTest() { return syncNodeInit(); } ...@@ -83,7 +83,7 @@ SSyncNode* syncInitTest() { return syncNodeInit(); }
void logStoreTest() { void logStoreTest() {
logStorePrint(pSyncNode->pLogStore); logStorePrint(pSyncNode->pLogStore);
for (int i = 0; i < 5; ++i) { for (int i = 0; i < 5; ++i) {
int32_t dataLen = 10; int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
assert(pEntry != NULL); assert(pEntry != NULL);
pEntry->msgType = 1; pEntry->msgType = 1;
...@@ -94,7 +94,7 @@ void logStoreTest() { ...@@ -94,7 +94,7 @@ void logStoreTest() {
pEntry->index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1; pEntry->index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
snprintf(pEntry->data, dataLen, "value%d", i); snprintf(pEntry->data, dataLen, "value%d", i);
//syncEntryPrint2((char*)"write entry:", pEntry); // syncEntryPrint2((char*)"write entry:", pEntry);
pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry); pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry);
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
} }
...@@ -132,8 +132,8 @@ int main(int argc, char** argv) { ...@@ -132,8 +132,8 @@ int main(int argc, char** argv) {
pSyncNode = syncInitTest(); pSyncNode = syncInitTest();
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
//syncNodePrint((char*)"syncLogStoreTest", pSyncNode); // syncNodePrint((char*)"syncLogStoreTest", pSyncNode);
//initRaftId(pSyncNode); // initRaftId(pSyncNode);
logStoreTest(); logStoreTest();
......
...@@ -86,7 +86,7 @@ int main(int argc, char** argv) { ...@@ -86,7 +86,7 @@ int main(int argc, char** argv) {
SSyncNode* pSyncNode = syncInitTest(); SSyncNode* pSyncNode = syncInitTest();
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
syncNodePrint((char*)"----1", pSyncNode); syncNodePrint2((char*)"----1", pSyncNode);
initRaftId(pSyncNode); initRaftId(pSyncNode);
...@@ -95,7 +95,7 @@ int main(int argc, char** argv) { ...@@ -95,7 +95,7 @@ int main(int argc, char** argv) {
sTrace("syncNodeStartPingTimer ..."); sTrace("syncNodeStartPingTimer ...");
ret = syncNodeStartPingTimer(pSyncNode); ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0); assert(ret == 0);
syncNodePrint((char*)"----2", pSyncNode); syncNodePrint2((char*)"----2", pSyncNode);
sTrace("sleep ..."); sTrace("sleep ...");
taosMsleep(10000); taosMsleep(10000);
...@@ -103,7 +103,7 @@ int main(int argc, char** argv) { ...@@ -103,7 +103,7 @@ int main(int argc, char** argv) {
sTrace("syncNodeStopPingTimer ..."); sTrace("syncNodeStopPingTimer ...");
ret = syncNodeStopPingTimer(pSyncNode); ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0); assert(ret == 0);
syncNodePrint((char*)"----3", pSyncNode); syncNodePrint2((char*)"----3", pSyncNode);
sTrace("sleep ..."); sTrace("sleep ...");
taosMsleep(5000); taosMsleep(5000);
...@@ -111,7 +111,7 @@ int main(int argc, char** argv) { ...@@ -111,7 +111,7 @@ int main(int argc, char** argv) {
sTrace("syncNodeStartPingTimer ..."); sTrace("syncNodeStartPingTimer ...");
ret = syncNodeStartPingTimer(pSyncNode); ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0); assert(ret == 0);
syncNodePrint((char*)"----4", pSyncNode); syncNodePrint2((char*)"----4", pSyncNode);
sTrace("sleep ..."); sTrace("sleep ...");
taosMsleep(10000); taosMsleep(10000);
...@@ -119,7 +119,7 @@ int main(int argc, char** argv) { ...@@ -119,7 +119,7 @@ int main(int argc, char** argv) {
sTrace("syncNodeStopPingTimer ..."); sTrace("syncNodeStopPingTimer ...");
ret = syncNodeStopPingTimer(pSyncNode); ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0); assert(ret == 0);
syncNodePrint((char*)"----5", pSyncNode); syncNodePrint2((char*)"----5", pSyncNode);
while (1) { while (1) {
sTrace("while 1 sleep ..."); sTrace("while 1 sleep ...");
......
...@@ -22,15 +22,21 @@ int main() { ...@@ -22,15 +22,21 @@ int main() {
SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json"); SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json");
assert(pRaftStore != NULL); assert(pRaftStore != NULL);
raftStorePrint(pRaftStore); raftStorePrint(pRaftStore);
#if 0
pRaftStore->currentTerm = 100; pRaftStore->currentTerm = 100;
pRaftStore->voteFor.addr = 200; pRaftStore->voteFor.addr = 200;
pRaftStore->voteFor.vgId = 300; pRaftStore->voteFor.vgId = 300;
raftStorePersist(pRaftStore);
raftStorePrint(pRaftStore); raftStorePrint(pRaftStore);
#endif
++(pRaftStore->currentTerm);
++(pRaftStore->voteFor.addr);
++(pRaftStore->voteFor.vgId);
raftStorePersist(pRaftStore); raftStorePersist(pRaftStore);
raftStorePrint(pRaftStore);
return 0; return 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册