diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index d767b3521eddb8a57d8bb0d27c4d73abd484afd8..aec8a1f73e3d5d8f6c78539c38adb6ace6ad6492 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -210,7 +210,7 @@ SyncGroupId syncGetVgId(int64_t rid); void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); -int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize); +int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize); bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 7e956237401bed6140201ff84427e9cda2aa9e8b..cd2c2d4a4f8b708246786fa1ea0cf030d5799593 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -238,7 +238,7 @@ typedef struct SyncClientRequestBatch { char data[]; // block2, block3 } SyncClientRequestBatch; -SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize, +SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize, int32_t vgId); void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg); void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index b802d94bea2b65deb73594dcebfc8a393cb437d0..586cfc0f15a00fdb567186a40f7b215842a5c927 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -170,7 +170,7 @@ void syncNodeStart(SSyncNode* pSyncNode); void syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak); -int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize); +int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize); // option bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ceca20150603d9079187b7ece312aa14f4b31f2a..ef9cf1fe8fe606aad33454fa7ef6facf09d0dd22 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -677,7 +677,7 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { return ret; } -int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) { +int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize) { if (arrSize < 0) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; @@ -690,18 +690,18 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_ } ASSERT(rid == pSyncNode->rid); - int32_t ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize); + int32_t ret = syncNodeProposeBatch(pSyncNode, pMsgPArr, pIsWeakArr, arrSize); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return ret; } -static bool syncNodeBatchOK(SRpcMsg* pMsgArr, int32_t arrSize) { +static bool syncNodeBatchOK(SRpcMsg** pMsgPArr, int32_t arrSize) { for (int32_t i = 0; i < arrSize; ++i) { - if (pMsgArr[i].msgType == TDMT_SYNC_CONFIG_CHANGE) { + if (pMsgPArr[i]->msgType == TDMT_SYNC_CONFIG_CHANGE) { return false; } - if (pMsgArr[i].msgType == TDMT_SYNC_CONFIG_CHANGE_FINISH) { + if (pMsgPArr[i]->msgType == TDMT_SYNC_CONFIG_CHANGE_FINISH) { return false; } } @@ -709,8 +709,8 @@ static bool syncNodeBatchOK(SRpcMsg* pMsgArr, int32_t arrSize) { return true; } -int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) { - if (!syncNodeBatchOK(pMsgArr, arrSize)) { +int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize) { + if (!syncNodeBatchOK(pMsgPArr, arrSize)) { syncNodeErrorLog(pSyncNode, "sync propose batch error"); terrno = TSDB_CODE_SYN_BATCH_ERROR; return -1; @@ -738,14 +738,14 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe for (int i = 0; i < arrSize; ++i) { SRespStub stub; stub.createTime = taosGetTimestampMs(); - stub.rpcMsg = pMsgArr[i]; + stub.rpcMsg = *(pMsgPArr[i]); uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub); raftArr[i].isWeak = pIsWeakArr[i]; raftArr[i].seqNum = seqNum; } - SyncClientRequestBatch* pSyncMsg = syncClientRequestBatchBuild(pMsgArr, raftArr, arrSize, pSyncNode->vgId); + SyncClientRequestBatch* pSyncMsg = syncClientRequestBatchBuild(pMsgPArr, raftArr, arrSize, pSyncNode->vgId); ASSERT(pSyncMsg != NULL); SRpcMsg rpcMsg; @@ -759,7 +759,7 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe SRpcMsg* msgArr = syncClientRequestBatchRpcMsgArr(pSyncMsg); ASSERT(arrSize == pSyncMsg->dataCount); for (int i = 0; i < arrSize; ++i) { - pMsgArr[i].info.conn.applyIndex = msgArr[i].info.conn.applyIndex; + pMsgPArr[i]->info.conn.applyIndex = msgArr[i].info.conn.applyIndex; syncRespMgrDel(pSyncNode->pSyncRespMgr, raftArr[i].seqNum); } @@ -860,7 +860,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { } else { ret = -1; terrno = TSDB_CODE_SYN_NOT_LEADER; - sError("vgId:%d, sync propose not leader, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state)); + sError("vgId:%d, sync propose not leader, %s, msgtype:%s,%d", pSyncNode->vgId, + syncUtilState2String(pSyncNode->state), TMSG_INFO(pMsg->msgType), pMsg->msgType); goto _END; } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 42a3290d5ba2d2b0effb655c4aa66f5539e89b86..13adaf055c1c320f56337ea90537e592fafcbe89 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -963,9 +963,9 @@ void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) { // block2: SRaftMeta array // block3: rpc msg array (with pCont) -SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize, +SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize, int32_t vgId) { - ASSERT(rpcMsgArr != NULL); + ASSERT(rpcMsgPArr != NULL); ASSERT(arrSize > 0); int32_t dataLen = 0; @@ -991,7 +991,7 @@ SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMet raftMetaArr[i].seqNum = raftArr[i].seqNum; // init msgArr - msgArr[i] = rpcMsgArr[i]; + msgArr[i] = *(rpcMsgPArr[i]); } return pMsg; diff --git a/source/libs/sync/test/syncClientRequestBatchTest.cpp b/source/libs/sync/test/syncClientRequestBatchTest.cpp index ae74baeda4c1451628ad9bad5131668abd338a3d..84d037be01028167584bfcaf912fc090540c0c7e 100644 --- a/source/libs/sync/test/syncClientRequestBatchTest.cpp +++ b/source/libs/sync/test/syncClientRequestBatchTest.cpp @@ -28,12 +28,12 @@ SRpcMsg *createRpcMsg(int32_t i, int32_t dataLen) { } SyncClientRequestBatch *createMsg() { - SRpcMsg rpcMsgArr[5]; - memset(rpcMsgArr, 0, sizeof(rpcMsgArr)); + SRpcMsg *rpcMsgPArr[5]; + memset(rpcMsgPArr, 0, sizeof(rpcMsgPArr)); for (int32_t i = 0; i < 5; ++i) { SRpcMsg *pRpcMsg = createRpcMsg(i, 20); - rpcMsgArr[i] = *pRpcMsg; - taosMemoryFree(pRpcMsg); + rpcMsgPArr[i] = pRpcMsg; + //taosMemoryFree(pRpcMsg); } SRaftMeta raftArr[5]; @@ -43,7 +43,7 @@ SyncClientRequestBatch *createMsg() { raftArr[i].isWeak = i % 2; } - SyncClientRequestBatch *pMsg = syncClientRequestBatchBuild(rpcMsgArr, raftArr, 5, 1234); + SyncClientRequestBatch *pMsg = syncClientRequestBatchBuild(rpcMsgPArr, raftArr, 5, 1234); return pMsg; }