diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 6231cb83990f8a4d1920c2f70fbdebb1f6e70be6..cbe45e07d4fc51b8340e0a47cf1f177a06f2b661 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -83,7 +83,7 @@ typedef struct SyncPing { char data[]; } SyncPing; -#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t)) +//#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t)) SyncPing* syncPingBuild(uint32_t dataLen); void syncPingDestroy(SyncPing* pMsg); diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index be25675db4a224344fd50defeed612b0d4efddec..e471488f5ce25ef4fcc332d7793df3636fcaa08b 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -46,8 +46,12 @@ char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry); char* syncEntry2Str(const SSyncRaftEntry* pEntry); -void syncEntryPrint(const SSyncRaftEntry* pEntry); -void syncEntryPrint2(char *s, const SSyncRaftEntry* pEntry); + +// for debug +void syncEntryPrint(const SSyncRaftEntry* pEntry); +void syncEntryPrint2(char* s, const SSyncRaftEntry* pEntry); +void syncEntryLog(const SSyncRaftEntry* pEntry); +void syncEntryLog2(char* s, const SSyncRaftEntry* pEntry); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index d59b3206b595752c0d3c045c6003f325f12de8b9..59b5fa94db355052db4737ec006565f58b18aa5a 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -32,39 +32,24 @@ typedef struct SSyncLogStoreData { SWal* pWal; } SSyncLogStoreData; -SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode); - -void logStoreDestory(SSyncLogStore* pLogStore); - -// append one log entry -int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); - -// get one log entry, user need to free pEntry->pCont +SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode); +void logStoreDestory(SSyncLogStore* pLogStore); +int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index); - -// truncate log with index, entries after the given index (>=index) will be deleted -int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); - -// return index of last entry -SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); - -// return term of last entry -SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); - -// update log store commit index with "index" -int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); - -// return commit index of log -SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); - +int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); +SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); +SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); +int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); +SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); - -cJSON* logStore2Json(SSyncLogStore* pLogStore); - -char* logStore2Str(SSyncLogStore* pLogStore); +cJSON* logStore2Json(SSyncLogStore* pLogStore); +char* logStore2Str(SSyncLogStore* pLogStore); // for debug void logStorePrint(SSyncLogStore* pLogStore); +void logStorePrint2(char* s, SSyncLogStore* pLogStore); +void logStoreLog(SSyncLogStore* pLogStore); +void logStoreLog2(char* s, SSyncLogStore* pLogStore); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index baef49d748f54f3119bf7a6718394cfd8abf8de5..4afe14b47522790f30620142f0dc255993c1fb14 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -150,7 +150,7 @@ SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock // ---- message process SyncPing---- SyncPing* syncPingBuild(uint32_t dataLen) { - uint32_t bytes = SYNC_PING_FIX_LEN + dataLen; + uint32_t bytes = sizeof(SyncPing) + dataLen; SyncPing* pMsg = malloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; @@ -173,7 +173,7 @@ void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen) { void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) { memcpy(pMsg, buf, len); assert(len == pMsg->bytes); - assert(pMsg->bytes == SYNC_PING_FIX_LEN + pMsg->dataLen); + assert(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen); } void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) { @@ -272,7 +272,7 @@ void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLe void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg) { memcpy(pMsg, buf, len); assert(len == pMsg->bytes); - assert(pMsg->bytes == SYNC_PING_FIX_LEN + pMsg->dataLen); + assert(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen); } void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg) { diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 959bf49ee7232bd931c29e5a01f35deaa6c9d868..d2feb98c90d345e8f4f3cbaec30d650e1df2a6e8 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -104,14 +104,29 @@ char* syncEntry2Str(const SSyncRaftEntry* pEntry) { return serialized; } +// for debug ----------- void syncEntryPrint(const SSyncRaftEntry* pEntry) { - char* s = syncEntry2Str(pEntry); - sTrace("%s", s); - free(s); + char* serialized = syncEntry2Str(pEntry); + printf("syncEntryPrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); } void syncEntryPrint2(char* s, const SSyncRaftEntry* pEntry) { - char* ss = syncEntry2Str(pEntry); - sTrace("%s | %s", s, ss); - free(ss); + char* serialized = syncEntry2Str(pEntry); + printf("syncEntryPrint | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void syncEntryLog(const SSyncRaftEntry* pEntry) { + char* serialized = syncEntry2Str(pEntry); + sTrace("syncEntryPrint | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void syncEntryLog2(char* s, const SSyncRaftEntry* pEntry) { + char* serialized = syncEntry2Str(pEntry); + sTrace("syncEntryPrint | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); } \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index d177d3ac9b8a3a66001453236e7309a747f1e802..27c8a26154e3ef11f5eca2fd118c51b00cc80c91 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -43,7 +43,6 @@ void logStoreDestory(SSyncLogStore* pLogStore) { } } -// append one log entry int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; @@ -61,7 +60,6 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { free(serialized); } -// get one log entry, user need to free pEntry->pCont SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; @@ -77,14 +75,12 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { return pEntry; } -// truncate log with index, entries after the given index (>=index) will be deleted int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; walRollback(pWal, fromIndex); } -// return index of last entry SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; @@ -92,7 +88,6 @@ SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) { return lastIndex; } -// return term of last entry SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { SyncTerm lastTerm = 0; SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore); @@ -103,14 +98,12 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { return lastTerm; } -// update log store commit index with "index" int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; walCommit(pWal, index); } -// return commit index of log SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; return pData->pSyncNode->commitIndex; @@ -163,11 +156,29 @@ char* logStore2Str(SSyncLogStore* pLogStore) { return serialized; } -// for debug +// for debug ----------------- void logStorePrint(SSyncLogStore* pLogStore) { - char* s = logStore2Str(pLogStore); - // sTrace("%s", s); - fprintf(stderr, "logStorePrint: [len:%lu]| %s \n", strlen(s), s); + char* serialized = logStore2Str(pLogStore); + printf("logStorePrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void logStorePrint2(char* s, SSyncLogStore* pLogStore) { + char* serialized = logStore2Str(pLogStore); + printf("logStorePrint | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} - free(s); +void logStoreLog(SSyncLogStore* pLogStore) { + char* serialized = logStore2Str(pLogStore); + sTrace("logStorePrint | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void logStoreLog2(char* s, SSyncLogStore* pLogStore) { + char* serialized = logStore2Str(pLogStore); + sTrace("logStorePrint | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); } \ No newline at end of file