diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 811a7b8e999b3c287c210c70fb7a71ed9ec3b419..3aeb2d30b54552040bb47b19f6cc66ef24838964 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -45,6 +45,11 @@ // /\ UNCHANGED <> // void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { + if (pSyncNode == NULL) { + sError("pSyncNode is NULL"); + return; + } + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { syncNodeErrorLog(pSyncNode, "not leader, can not advance commit index"); return; @@ -172,6 +177,7 @@ static inline int64_t syncNodeAbs64(int64_t a, int64_t b) { int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { return pSyncNode->quorum; +#if 0 int32_t quorum = 1; // self int64_t timeNow = taosGetTimestampMs(); @@ -228,6 +234,7 @@ int32_t syncNodeDynamicQuorum(const SSyncNode* pSyncNode) { } return quorum; +#endif } /* diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 1f24c7c40367bbe8051bd7f90b5ae212a6366d57..09584077f199aca1278f38204bb483dd5120dd1a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -835,7 +835,9 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { sInfo("vgId:%d, sync get retry epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); } - pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps; + if (pEpSet->numOfEps > 0) { + pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps; + } sInfo("vgId:%d, sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse); taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -1438,12 +1440,13 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { } void syncNodeClose(SSyncNode* pSyncNode) { - syncNodeEventLog(pSyncNode, "sync close"); if (pSyncNode == NULL) { return; } int32_t ret; + syncNodeEventLog(pSyncNode, "sync close"); + ret = raftStoreClose(pSyncNode->pRaftStore); ASSERT(ret == 0); @@ -1879,6 +1882,10 @@ char* syncNode2Str(const SSyncNode* pSyncNode) { } inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { + if (pSyncNode == NULL) { + return; + } + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); @@ -1954,6 +1961,10 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { } inline void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { + if (pSyncNode == NULL) { + return; + } + int32_t userStrLen = strlen(str); SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; @@ -2937,6 +2948,7 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) { sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL"); } + syncEntryDestory(pEntry); taosMemoryFree(serialized); syncClientRequestDestroy(pSyncMsg); @@ -3003,13 +3015,14 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { syncPingReply2RpcMsg(pMsgReply, &rpcMsg); /* - // htonl - SMsgHead* pHead = rpcMsg.pCont; - pHead->contLen = htonl(pHead->contLen); - pHead->vgId = htonl(pHead->vgId); - */ + // htonl + SMsgHead* pHead = rpcMsg.pCont; + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); +*/ syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + syncPingReplyDestroy(pMsgReply); return ret; } @@ -3058,6 +3071,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { // reply syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + syncHeartbeatReplyDestroy(pMsgReply); return 0; } @@ -3329,17 +3343,23 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde return 0; } - // advance commit index to sanpshot first - SSnapshot snapshot = {0}; - ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); - if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) { - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, - snapshot.lastApplyIndex); - syncNodeEventLog(ths, eventLog); + if (ths == NULL) { + return -1; + } + + if (ths->pFsm != NULL && ths->pFsm->FpGetSnapshotInfo != NULL) { + // advance commit index to sanpshot first + SSnapshot snapshot = {0}; + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); + if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) { + char eventLog[128]; + snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex, + snapshot.lastApplyIndex); + syncNodeEventLog(ths, eventLog); - // update begin index - beginIndex = snapshot.lastApplyIndex + 1; + // update begin index + beginIndex = snapshot.lastApplyIndex + 1; + } } int32_t code = 0; @@ -3413,8 +3433,10 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde // config change finish if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE_FINISH) { - code = syncNodeConfigChangeFinish(ths, &rpcMsg, pEntry); - ASSERT(code == 0); + if (rpcMsg.pCont != NULL) { + code = syncNodeConfigChangeFinish(ths, &rpcMsg, pEntry); + ASSERT(code == 0); + } } #if 0 @@ -3528,7 +3550,7 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) { for (int i = 0; i < pSyncNode->peersNum; ++i) { SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->peersId)[i]); - if (pSender->start) { + if (pSender != NULL && pSender->start) { sError("sync cannot change3"); return false; } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 9de3fde3894f069afbe8152b646643a2b2a1fd25..4001a955fb01435db30073546c085a021a752964 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -411,32 +411,40 @@ SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) { pMsg->bytes = bytes; if (tDecodeI32(&decoder, &pMsg->vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU32(&decoder, &pMsg->msgType) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) { + taosMemoryFree(pMsg); return NULL; } uint32_t len; char* data = NULL; if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) { + taosMemoryFree(pMsg); return NULL; } - ASSERT(len = pMsg->dataLen); + ASSERT(len == pMsg->dataLen); memcpy(pMsg->data, data, len); tEndDecode(&decoder); @@ -673,32 +681,40 @@ SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) { pMsg->bytes = bytes; if (tDecodeI32(&decoder, &pMsg->vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU32(&decoder, &pMsg->msgType) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) { + taosMemoryFree(pMsg); return NULL; } if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) { + taosMemoryFree(pMsg); return NULL; } uint32_t len; char* data = NULL; if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) { + taosMemoryFree(pMsg); return NULL; } - ASSERT(len = pMsg->dataLen); + ASSERT(len == pMsg->dataLen); memcpy(pMsg->data, data, len); tEndDecode(&decoder); diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 940aaca055eab991087c7ab7591abc0f46e78ede..6d372acf2fd1b349e60309ecc0dcc87e995a8d01 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -532,7 +532,7 @@ int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry* pEntry = NULL; int32_t code = raftEntryCacheGetEntryP(pCache, index, &pEntry); if (code == 1) { - *ppEntry = taosMemoryMalloc(pEntry->bytes); + *ppEntry = taosMemoryMalloc((int64_t)(pEntry->bytes)); memcpy(*ppEntry, pEntry, pEntry->bytes); (*ppEntry)->rid = -1; } else { diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 6f234631dae8620df80fe5aba0366a5cb8ad62c7..f152201901952fb57cc5a5c16ecfe3737184d039 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -209,7 +209,8 @@ bool syncUtilCanPrint(char c) { } char* syncUtilprintBin(char* ptr, uint32_t len) { - char* s = taosMemoryMalloc(len + 1); + int64_t memLen = (int64_t)(len + 1); + char* s = taosMemoryMalloc(memLen); ASSERT(s != NULL); memset(s, 0, len + 1); memcpy(s, ptr, len);