diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c58cc3c7c5afcd5cfafb265e8bcf2ac6e3ae7452..a0f78d0d590c05938e046a6487655ac98d6b3647 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1234,20 +1234,20 @@ int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* int32_t tDeserializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq); typedef struct { - int32_t vgVersion; - int32_t buffer; - int32_t pageSize; - int32_t pages; - int32_t cacheLastSize; - int32_t daysPerFile; - int32_t daysToKeep0; - int32_t daysToKeep1; - int32_t daysToKeep2; - int32_t walFsyncPeriod; - int8_t walLevel; - int8_t strict; - int8_t cacheLast; - int64_t reserved[8]; + int32_t vgVersion; + int32_t buffer; + int32_t pageSize; + int32_t pages; + int32_t cacheLastSize; + int32_t daysPerFile; + int32_t daysToKeep0; + int32_t daysToKeep1; + int32_t daysToKeep2; + int32_t walFsyncPeriod; + int8_t walLevel; + int8_t strict; + int8_t cacheLast; + int64_t reserved[8]; } SAlterVnodeConfigReq; int32_t tSerializeSAlterVnodeConfigReq(void* buf, int32_t bufLen, SAlterVnodeConfigReq* pReq); @@ -3065,7 +3065,7 @@ static FORCE_INLINE void* tDecodeSMqAskEpRsp(void* buf, SMqAskEpRsp* pRsp) { } static FORCE_INLINE void tDeleteSMqAskEpRsp(SMqAskEpRsp* pRsp) { - taosArrayDestroyEx(pRsp->topics, (void (*)(void*))tDeleteSMqSubTopicEp); + taosArrayDestroyEx(pRsp->topics, (FDelete)tDeleteSMqSubTopicEp); } #define TD_AUTO_CREATE_TABLE 0x1 diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 0b7b80649e758e084b26c9f173038efd271723cf..6e3e274052d28d42733f4bf54da1761b7a6cd80e 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1206,7 +1206,7 @@ CREATE_MSG_FAIL: return -1; } -bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { +bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { bool set = false; int32_t topicNumGet = taosArrayGetSize(pRsp->topics); @@ -1599,6 +1599,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg; tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg); /*tmqClearUnhandleMsg(tmq);*/ + tDeleteSMqAskEpRsp(rspMsg); *pReset = true; } else { *pReset = false; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 67d2ca78ee596cafee038175e666e86439523d5a..d682fa9cc58339e6e8d9a926a50e08d2cd9d42d2 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -273,7 +273,17 @@ int32_t vmProcessAlterVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t vgId = alterReq.vgId; - dInfo("vgId:%d, start to alter vnode replica", alterReq.vgId); + dInfo("vgId:%d, start to alter vnode, replica:%d selfIndex:%d strict:%d", alterReq.vgId, alterReq.replica, + alterReq.selfIndex, alterReq.strict); + for (int32_t i = 0; i < alterReq.replica; ++i) { + dInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port); + } + + if (alterReq.replica <= 0 || alterReq.selfIndex < 0 || alterReq.selfIndex >= alterReq.replica) { + terrno = TSDB_CODE_INVALID_MSG; + dError("vgId:%d, failed to alter replica since invalid msg", alterReq.vgId); + return -1; + } SVnodeObj *pVnode = vmAcquireVnode(pMgmt, vgId); if (pVnode == NULL) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 11f5a9f12afe34ee468eef9d5afadae3435f30ba..f46fc5d23ed72978753af6cc290e75d697df2206 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -341,6 +341,12 @@ static void *mndBuildAlterVnodeReplicaReq(SMnode *pMnode, SDbObj *pDb, SVgObj *p alterReq.selfIndex = v; } } + alterReq.replica = pVgroup->replica; + mInfo("vgId:%d, start to alter vnode, replica:%d selfIndex:%d strict:%d", alterReq.vgId, alterReq.replica, + alterReq.selfIndex, alterReq.strict); + for (int32_t i = 0; i < alterReq.replica; ++i) { + mInfo("vgId:%d, replica:%d ep:%s:%u", alterReq.vgId, i, alterReq.replicas[i].fqdn, alterReq.replicas[i].port); + } if (alterReq.selfIndex == -1) { terrno = TSDB_CODE_MND_APP_ERROR;