diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 57303303f1a7d78bf0614f2687298d31c5071bc2..edc01c9a05711648b819880a3936e3666fe07bc8 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -201,6 +201,43 @@ static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) { return SYNC_TERM_INVALID; } +static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + + SyncIndex index = 0; + SWalSyncInfo syncMeta; + syncMeta.isWeek = pEntry->isWeak; + syncMeta.seqNum = pEntry->seqNum; + syncMeta.term = pEntry->term; + index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); + if (index < 0) { + int32_t err = terrno; + const char* errStr = tstrerror(err); + int32_t sysErr = errno; + const char* sysErrStr = strerror(errno); + + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", + pEntry->index, err, err, errStr, sysErr, sysErrStr); + syncNodeErrorLog(pData->pSyncNode, logBuf); + + ASSERT(0); + return -1; + } + pEntry->index = index; + + do { + char eventLog[128]; + snprintf(eventLog, sizeof(eventLog), "write index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index, + TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); + syncNodeEventLog(pData->pSyncNode, eventLog); + } while (0); + + return 0; +} + +#if 0 static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; @@ -243,6 +280,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr return code; } +#endif // entry found, return 0 // entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST @@ -361,6 +399,8 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp //------------------------------- // log[0 .. n] + +#if 0 int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; @@ -397,6 +437,44 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { return code; } +#endif + +int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + + SyncIndex index = 0; + SWalSyncInfo syncMeta; + syncMeta.isWeek = pEntry->isWeak; + syncMeta.seqNum = pEntry->seqNum; + syncMeta.term = pEntry->term; + + index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); + if (index < 0) { + int32_t err = terrno; + const char* errStr = tstrerror(err); + int32_t sysErr = errno; + const char* sysErrStr = strerror(errno); + + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "wal write error, index:%" PRId64 ", err:%d %X, msg:%s, syserr:%d, sysmsg:%s", + pEntry->index, err, err, errStr, sysErr, sysErrStr); + syncNodeErrorLog(pData->pSyncNode, logBuf); + + ASSERT(0); + return -1; + } + pEntry->index = index; + + do { + char eventLog[128]; + snprintf(eventLog, sizeof(eventLog), "write2 index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index, + TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); + syncNodeEventLog(pData->pSyncNode, eventLog); + } while (0); + + return 0; +} SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data;