From f3c30e33fb4ae9f7f061e7525afd55c1d84938c7 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 15 Feb 2022 18:29:00 +0800 Subject: [PATCH] minor changes --- source/dnode/mnode/impl/src/mndSubscribe.c | 74 +++++++++++----------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 2a3e0008a2..6afa1f4c79 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -12,8 +12,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#define _DEFAULT_SOURCE +#define _DEFAULT_SOURCE #include "mndSubscribe.h" #include "mndConsumer.h" #include "mndDb.h" @@ -54,12 +54,12 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg); static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg); static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg); -static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, - const SMqConsumerEp *pConsumerEp); +static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, + const SMqConsumerEp *pConsumerEp); static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp); -static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub); +static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub); int32_t mndInitSubscribe(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_SUBSCRIBE, @@ -232,22 +232,22 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { if (epoch != rsp.epoch) { mInfo("send new assignment to consumer, consumer epoch %d, server epoch %d", epoch, rsp.epoch); SArray *pTopics = pConsumer->currentTopics; - int sz = taosArrayGetSize(pTopics); + int32_t sz = taosArrayGetSize(pTopics); rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp)); - for (int i = 0; i < sz; i++) { + for (int32_t i = 0; i < sz; i++) { char *topicName = taosArrayGetP(pTopics, i); SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName); ASSERT(pSub); - int csz = taosArrayGetSize(pSub->consumers); + int32_t csz = taosArrayGetSize(pSub->consumers); // TODO: change to bsearch - for (int j = 0; j < csz; j++) { + for (int32_t j = 0; j < csz; j++) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); if (consumerId == pSubConsumer->consumerId) { - int vgsz = taosArrayGetSize(pSubConsumer->vgInfo); + int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo); SMqSubTopicEp topicEp; strcpy(topicEp.topic, topicName); topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp)); - for (int k = 0; k < vgsz; k++) { + for (int32_t k = 0; k < vgsz; k++) { SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k); SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId}; @@ -276,7 +276,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { } static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) { - int i = 0; + int32_t i = 0; while (key[i] != ':') { i++; } @@ -317,8 +317,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST); if (old == MQ_CONSUMER_STATUS__ACTIVE) { // get all topics of that topic - int sz = taosArrayGetSize(pConsumer->currentTopics); - for (int i = 0; i < sz; i++) { + int32_t sz = taosArrayGetSize(pConsumer->currentTopics); + for (int32_t i = 0; i < sz; i++) { char *topic = taosArrayGetP(pConsumer->currentTopics, i); char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic); SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); @@ -334,8 +334,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { } else { rebSubs = pConsumer->recentRemovedTopics; } - int sz = taosArrayGetSize(rebSubs); - for (int i = 0; i < sz; i++) { + int32_t sz = taosArrayGetSize(rebSubs); + for (int32_t i = 0; i < sz; i++) { char *topic = taosArrayGetP(rebSubs, i); char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic); SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); @@ -375,12 +375,12 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { mInfo("mq rebalance subscription: %s", pSub->key); // remove lost consumer - for (int i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) { + for (int32_t i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) { int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i); mInfo("mq remove lost consumer %ld", lostConsumerId); - for (int j = 0; j < taosArrayGetSize(pSub->consumers); j++) { + for (int32_t j = 0; j < taosArrayGetSize(pSub->consumers); j++) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j); if (pSubConsumer->consumerId == lostConsumerId) { taosArrayAddAll(pSub->unassignedVg, pSubConsumer->vgInfo); @@ -400,10 +400,10 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { int32_t imbalanceSolved = 0; // iterate all consumers, set unassignedVgStash - for (int i = 0; i < consumerNum; i++) { + for (int32_t i = 0; i < consumerNum; i++) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); - int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); - int vgThisConsumerAfterRb; + int32_t vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); + int32_t vgThisConsumerAfterRb; if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1; else @@ -442,10 +442,10 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { // assign to vgroup if (taosArrayGetSize(pSub->unassignedVg) != 0) { - for (int i = 0; i < consumerNum; i++) { + for (int32_t i = 0; i < consumerNum; i++) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i); - int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); - int vgThisConsumerAfterRb; + int32_t vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo); + int32_t vgThisConsumerAfterRb; if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1; else @@ -602,7 +602,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { #if 0 //update consumer status for the subscribption - for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) { + for (int32_t i = 0; i < taosArrayGetSize(pSub->assigned); i++) { SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i); int64_t consumerId = pCEp->consumerId; if (pCEp->status != -1) { @@ -619,7 +619,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { // TODO: swap with last one, reduce size and reset i taosArrayRemove(pSub->assigned, i); // remove from available consumer - for (int j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) { + for (int32_t j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) { if (*(int64_t *)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) { taosArrayRemove(pSub->availConsumer, j); break; @@ -699,7 +699,7 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) { } #endif -static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) { +static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); @@ -742,8 +742,8 @@ static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSub return 0; } -static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, - const SMqConsumerEp *pConsumerEp) { +static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup, + const SMqConsumerEp *pConsumerEp) { ASSERT(pConsumerEp->oldConsumerId == -1); int32_t vgId = pConsumerEp->vgId; SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); @@ -890,7 +890,7 @@ static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName) { if (key == NULL) { return NULL; } - int tlen = strlen(cgroup); + int32_t tlen = strlen(cgroup); memcpy(key, cgroup, tlen); key[tlen] = ':'; strcpy(key + tlen + 1, topicName); @@ -931,12 +931,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { char *cgroup = subscribe.consumerGroup; SArray *newSub = subscribe.topicNames; - int newTopicNum = subscribe.topicNum; + int32_t newTopicNum = subscribe.topicNum; taosArraySortString(newSub, taosArrayCompareString); SArray *oldSub = NULL; - int oldTopicNum = 0; + int32_t oldTopicNum = 0; bool createConsumer = false; // create consumer if not exist SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); @@ -960,7 +960,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { return -1; } - int i = 0, j = 0; + int32_t i = 0, j = 0; while (i < newTopicNum || j < oldTopicNum) { char *newTopicName = NULL; char *oldTopicName = NULL; @@ -975,7 +975,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { newTopicName = taosArrayGetP(newSub, i); oldTopicName = taosArrayGetP(oldSub, j); - int comp = compareLenPrefixedStr(newTopicName, oldTopicName); + int32_t comp = compareLenPrefixedStr(newTopicName, oldTopicName); if (comp == 0) { // do nothing oldTopicName = newTopicName = NULL; @@ -997,12 +997,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { // cancel subscribe of old topic SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, cgroup, oldTopicName); ASSERT(pSub); - int csz = taosArrayGetSize(pSub->consumers); - for (int ci = 0; ci < csz; ci++) { + int32_t csz = taosArrayGetSize(pSub->consumers); + for (int32_t ci = 0; ci < csz; ci++) { SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, ci); if (pSubConsumer->consumerId == consumerId) { - int vgsz = taosArrayGetSize(pSubConsumer->vgInfo); - for (int vgi = 0; vgi < vgsz; vgi++) { + int32_t vgsz = taosArrayGetSize(pSubConsumer->vgInfo); + for (int32_t vgi = 0; vgi < vgsz; vgi++) { SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi); mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp); taosArrayPush(pSub->unassignedVg, pConsumerEp); -- GitLab