diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 3d6bb060151cd815ff9b0f0d26396e1b8b95e9b8..ea605090f94f142775d3278da9038426f614be70 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -842,7 +842,7 @@ typedef struct { char db[TSDB_DB_FNAME_LEN]; int64_t dbUid; int32_t vgVersion; - int32_t numOfStables; + int32_t numOfStables; int32_t buffer; int32_t pageSize; int32_t pages; @@ -1442,32 +1442,32 @@ typedef struct { SArray* lostConsumers; // SArray SArray* removedConsumers; // SArray SArray* newConsumers; // SArray -} SMqRebSubscribe; +} SMqRebInfo; -static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) { - SMqRebSubscribe* pRebSub = (SMqRebSubscribe*)taosMemoryCalloc(1, sizeof(SMqRebSubscribe)); - if (pRebSub == NULL) { +static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) { + SMqRebInfo* pRebInfo = (SMqRebInfo*)taosMemoryCalloc(1, sizeof(SMqRebInfo)); + if (pRebInfo == NULL) { goto _err; } - strcpy(pRebSub->key, key); - pRebSub->lostConsumers = taosArrayInit(0, sizeof(int64_t)); - if (pRebSub->lostConsumers == NULL) { + strcpy(pRebInfo->key, key); + pRebInfo->lostConsumers = taosArrayInit(0, sizeof(int64_t)); + if (pRebInfo->lostConsumers == NULL) { goto _err; } - pRebSub->removedConsumers = taosArrayInit(0, sizeof(int64_t)); - if (pRebSub->removedConsumers == NULL) { + pRebInfo->removedConsumers = taosArrayInit(0, sizeof(int64_t)); + if (pRebInfo->removedConsumers == NULL) { goto _err; } - pRebSub->newConsumers = taosArrayInit(0, sizeof(int64_t)); - if (pRebSub->newConsumers == NULL) { + pRebInfo->newConsumers = taosArrayInit(0, sizeof(int64_t)); + if (pRebInfo->newConsumers == NULL) { goto _err; } - return pRebSub; + return pRebInfo; _err: - taosArrayDestroy(pRebSub->lostConsumers); - taosArrayDestroy(pRebSub->removedConsumers); - taosArrayDestroy(pRebSub->newConsumers); - taosMemoryFreeClear(pRebSub); + taosArrayDestroy(pRebInfo->lostConsumers); + taosArrayDestroy(pRebInfo->removedConsumers); + taosArrayDestroy(pRebInfo->newConsumers); + taosMemoryFreeClear(pRebInfo); return NULL; } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a2c1d54cab0cbe7e9349e58f928572d09677e660..263fd0bad556bb0404cfc25c390fadbc03ce0520 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -554,9 +554,8 @@ int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogO void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog); typedef struct { - const SMqSubscribeObj* pOldSub; - const SMqTopicObj* pTopic; - const SMqRebSubscribe* pRebInfo; + int32_t oldConsumerNum; + const SMqRebInfo* pRebInfo; } SMqRebInputObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index b17772bdb2184c32b92dd87911490222967b24fb..23a87b4691388d7b17ef25f08d3120844d1ca7b1 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -134,15 +134,15 @@ FAIL: return -1; } -static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { - SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key) + 1); +static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { + SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1); if (pRebSub == NULL) { pRebSub = tNewSMqRebSubscribe(key); if (pRebSub == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebSubscribe)); + taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo)); } return pRebSub; } @@ -189,7 +189,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { char key[TSDB_SUBSCRIBE_KEY_LEN]; char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i); mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic); - SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); + SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); } taosRUnLockLatch(&pConsumer->lock); @@ -200,7 +200,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { char key[TSDB_SUBSCRIBE_KEY_LEN]; char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i); mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic); - SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); + SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId); } @@ -209,7 +209,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { char key[TSDB_SUBSCRIBE_KEY_LEN]; char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i); mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic); - SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); + SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); } taosRUnLockLatch(&pConsumer->lock); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 6ff295ec7d4d7de171532d25f6e71b84917b572b..5bce6ac218e29c1bfda8a323af37af78c39c773a 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -277,6 +277,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); taosInitRWLatch(&pSubNew->lock); + pSubNew->dbUid = pSub->dbUid; pSubNew->subType = pSub->subType; pSubNew->withTbName = pSub->withTbName; pSubNew->withSchema = pSub->withSchema; @@ -310,6 +311,7 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) { int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { int32_t tlen = 0; tlen += taosEncodeString(buf, pSub->key); + tlen += taosEncodeFixedI64(buf, pSub->dbUid); tlen += taosEncodeFixedI32(buf, pSub->vgNum); tlen += taosEncodeFixedI8(buf, pSub->subType); tlen += taosEncodeFixedI8(buf, pSub->withTbName); @@ -336,6 +338,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) { // buf = taosDecodeStringTo(buf, pSub->key); + buf = taosDecodeFixedI64(buf, &pSub->dbUid); buf = taosDecodeFixedI32(buf, &pSub->vgNum); buf = taosDecodeFixedI8(buf, &pSub->subType); buf = taosDecodeFixedI8(buf, &pSub->withTbName); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index bcec73ed307164fb63f1bf63f31aad0521b24c67..545caea03d350366f6849cc1807519c3849951fe 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -175,27 +175,20 @@ static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) return 0; } -static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { - SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key) + 1); +static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { + SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1); if (pRebSub == NULL) { pRebSub = tNewSMqRebSubscribe(key); if (pRebSub == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebSubscribe)); + taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo)); } return pRebSub; } static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) { - if (pInput->pTopic != NULL) { - // create subscribe - pOutput->pSub = mndCreateSub(pMnode, pInput->pTopic, pInput->pRebInfo->key); - ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) == 0); - } else { - pOutput->pSub = tCloneSubscribeObj(pInput->pOldSub); - } int32_t totalVgNum = pOutput->pSub->vgNum; mInfo("mq rebalance subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum); @@ -246,12 +239,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR } // 3. calc vg number of each consumer - int32_t oldSz = 0; - if (pInput->pOldSub) { - oldSz = taosHashGetSize(pInput->pOldSub->consumerHash); - } - int32_t afterRebConsumerNum = - oldSz + taosArrayGetSize(pInput->pRebInfo->newConsumers) - taosArrayGetSize(pInput->pRebInfo->removedConsumers); + int32_t afterRebConsumerNum = pInput->oldConsumerNum + taosArrayGetSize(pInput->pRebInfo->newConsumers) - + taosArrayGetSize(pInput->pRebInfo->removedConsumers); int32_t minVgCnt = 0; int32_t imbConsumerNum = 0; // calc num @@ -489,22 +478,34 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { rebOutput.touchedConsumers = taosArrayInit(0, sizeof(void *)); rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg)); - SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter; - SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key); + SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter; + SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key); + + rebInput.pRebInfo = pRebInfo; if (pSub == NULL) { // split sub key and extract topic char topic[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CGROUP_LEN]; - mndSplitSubscribeKey(pRebSub->key, topic, cgroup); + mndSplitSubscribeKey(pRebInfo->key, topic, cgroup); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); ASSERT(pTopic); taosRLockLatch(&pTopic->lock); - rebInput.pTopic = pTopic; - } - rebInput.pRebInfo = pRebSub; - rebInput.pOldSub = pSub; + rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key); + ASSERT(taosHashGetSize(rebOutput.pSub->consumerHash) == 0); + + taosRUnLockLatch(&pTopic->lock); + mndReleaseTopic(pMnode, pTopic); + + rebInput.oldConsumerNum = 0; + } else { + taosRLockLatch(&pSub->lock); + rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash); + rebOutput.pSub = tCloneSubscribeObj(pSub); + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); + } // TODO replace assert with error check ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0); @@ -517,14 +518,6 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) { mError("persist rebalance output error, possibly vnode splitted or dropped"); } - - if (rebInput.pTopic) { - SMqTopicObj *pTopic = (SMqTopicObj *)rebInput.pTopic; - taosRUnLockLatch(&pTopic->lock); - mndReleaseTopic(pMnode, pTopic); - } else { - mndReleaseSubscribe(pMnode, pSub); - } } // reset flag