提交 3496381f 编写于 作者: L Liu Jicong

refactor: rebalance input

上级 ca786a92
...@@ -1442,32 +1442,32 @@ typedef struct { ...@@ -1442,32 +1442,32 @@ typedef struct {
SArray* lostConsumers; // SArray<int64_t> SArray* lostConsumers; // SArray<int64_t>
SArray* removedConsumers; // SArray<int64_t> SArray* removedConsumers; // SArray<int64_t>
SArray* newConsumers; // SArray<int64_t> SArray* newConsumers; // SArray<int64_t>
} SMqRebSubscribe; } SMqRebInfo;
static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) { static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
SMqRebSubscribe* pRebSub = (SMqRebSubscribe*)taosMemoryCalloc(1, sizeof(SMqRebSubscribe)); SMqRebInfo* pRebInfo = (SMqRebInfo*)taosMemoryCalloc(1, sizeof(SMqRebInfo));
if (pRebSub == NULL) { if (pRebInfo == NULL) {
goto _err; goto _err;
} }
strcpy(pRebSub->key, key); strcpy(pRebInfo->key, key);
pRebSub->lostConsumers = taosArrayInit(0, sizeof(int64_t)); pRebInfo->lostConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebSub->lostConsumers == NULL) { if (pRebInfo->lostConsumers == NULL) {
goto _err; goto _err;
} }
pRebSub->removedConsumers = taosArrayInit(0, sizeof(int64_t)); pRebInfo->removedConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebSub->removedConsumers == NULL) { if (pRebInfo->removedConsumers == NULL) {
goto _err; goto _err;
} }
pRebSub->newConsumers = taosArrayInit(0, sizeof(int64_t)); pRebInfo->newConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebSub->newConsumers == NULL) { if (pRebInfo->newConsumers == NULL) {
goto _err; goto _err;
} }
return pRebSub; return pRebInfo;
_err: _err:
taosArrayDestroy(pRebSub->lostConsumers); taosArrayDestroy(pRebInfo->lostConsumers);
taosArrayDestroy(pRebSub->removedConsumers); taosArrayDestroy(pRebInfo->removedConsumers);
taosArrayDestroy(pRebSub->newConsumers); taosArrayDestroy(pRebInfo->newConsumers);
taosMemoryFreeClear(pRebSub); taosMemoryFreeClear(pRebInfo);
return NULL; return NULL;
} }
......
...@@ -554,9 +554,8 @@ int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogO ...@@ -554,9 +554,8 @@ int32_t tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogO
void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog); void* tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog);
typedef struct { typedef struct {
const SMqSubscribeObj* pOldSub; int32_t oldConsumerNum;
const SMqTopicObj* pTopic; const SMqRebInfo* pRebInfo;
const SMqRebSubscribe* pRebInfo;
} SMqRebInputObj; } SMqRebInputObj;
typedef struct { typedef struct {
......
...@@ -134,15 +134,15 @@ FAIL: ...@@ -134,15 +134,15 @@ FAIL:
return -1; return -1;
} }
static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key) + 1); SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
if (pRebSub == NULL) { if (pRebSub == NULL) {
pRebSub = tNewSMqRebSubscribe(key); pRebSub = tNewSMqRebSubscribe(key);
if (pRebSub == NULL) { if (pRebSub == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebSubscribe)); taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo));
} }
return pRebSub; return pRebSub;
} }
...@@ -189,7 +189,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { ...@@ -189,7 +189,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
char key[TSDB_SUBSCRIBE_KEY_LEN]; char key[TSDB_SUBSCRIBE_KEY_LEN];
char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i); char *removedTopic = taosArrayGetP(pConsumer->currentTopics, i);
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic); mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
} }
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
...@@ -200,7 +200,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { ...@@ -200,7 +200,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
char key[TSDB_SUBSCRIBE_KEY_LEN]; char key[TSDB_SUBSCRIBE_KEY_LEN];
char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i); char *newTopic = taosArrayGetP(pConsumer->rebNewTopics, i);
mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic); mndMakeSubscribeKey(key, pConsumer->cgroup, newTopic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId); taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
} }
...@@ -209,7 +209,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) { ...@@ -209,7 +209,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
char key[TSDB_SUBSCRIBE_KEY_LEN]; char key[TSDB_SUBSCRIBE_KEY_LEN];
char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i); char *removedTopic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic); mndMakeSubscribeKey(key, pConsumer->cgroup, removedTopic);
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key); SMqRebInfo *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
} }
taosRUnLockLatch(&pConsumer->lock); taosRUnLockLatch(&pConsumer->lock);
......
...@@ -277,6 +277,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { ...@@ -277,6 +277,7 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
taosInitRWLatch(&pSubNew->lock); taosInitRWLatch(&pSubNew->lock);
pSubNew->dbUid = pSub->dbUid;
pSubNew->subType = pSub->subType; pSubNew->subType = pSub->subType;
pSubNew->withTbName = pSub->withTbName; pSubNew->withTbName = pSub->withTbName;
pSubNew->withSchema = pSub->withSchema; pSubNew->withSchema = pSub->withSchema;
...@@ -310,6 +311,7 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) { ...@@ -310,6 +311,7 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeString(buf, pSub->key); tlen += taosEncodeString(buf, pSub->key);
tlen += taosEncodeFixedI64(buf, pSub->dbUid);
tlen += taosEncodeFixedI32(buf, pSub->vgNum); tlen += taosEncodeFixedI32(buf, pSub->vgNum);
tlen += taosEncodeFixedI8(buf, pSub->subType); tlen += taosEncodeFixedI8(buf, pSub->subType);
tlen += taosEncodeFixedI8(buf, pSub->withTbName); tlen += taosEncodeFixedI8(buf, pSub->withTbName);
...@@ -336,6 +338,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { ...@@ -336,6 +338,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) { void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
// //
buf = taosDecodeStringTo(buf, pSub->key); buf = taosDecodeStringTo(buf, pSub->key);
buf = taosDecodeFixedI64(buf, &pSub->dbUid);
buf = taosDecodeFixedI32(buf, &pSub->vgNum); buf = taosDecodeFixedI32(buf, &pSub->vgNum);
buf = taosDecodeFixedI8(buf, &pSub->subType); buf = taosDecodeFixedI8(buf, &pSub->subType);
buf = taosDecodeFixedI8(buf, &pSub->withTbName); buf = taosDecodeFixedI8(buf, &pSub->withTbName);
......
...@@ -175,27 +175,20 @@ static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) ...@@ -175,27 +175,20 @@ static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup)
return 0; return 0;
} }
static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key) + 1); SMqRebInfo *pRebSub = taosHashGet(pHash, key, strlen(key) + 1);
if (pRebSub == NULL) { if (pRebSub == NULL) {
pRebSub = tNewSMqRebSubscribe(key); pRebSub = tNewSMqRebSubscribe(key);
if (pRebSub == NULL) { if (pRebSub == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebSubscribe)); taosHashPut(pHash, key, strlen(key) + 1, pRebSub, sizeof(SMqRebInfo));
} }
return pRebSub; return pRebSub;
} }
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) { static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
if (pInput->pTopic != NULL) {
// create subscribe
pOutput->pSub = mndCreateSub(pMnode, pInput->pTopic, pInput->pRebInfo->key);
ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) == 0);
} else {
pOutput->pSub = tCloneSubscribeObj(pInput->pOldSub);
}
int32_t totalVgNum = pOutput->pSub->vgNum; int32_t totalVgNum = pOutput->pSub->vgNum;
mInfo("mq rebalance subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum); mInfo("mq rebalance subscription: %s, vgNum: %d", pOutput->pSub->key, pOutput->pSub->vgNum);
...@@ -246,12 +239,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -246,12 +239,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
} }
// 3. calc vg number of each consumer // 3. calc vg number of each consumer
int32_t oldSz = 0; int32_t afterRebConsumerNum = pInput->oldConsumerNum + taosArrayGetSize(pInput->pRebInfo->newConsumers) -
if (pInput->pOldSub) { taosArrayGetSize(pInput->pRebInfo->removedConsumers);
oldSz = taosHashGetSize(pInput->pOldSub->consumerHash);
}
int32_t afterRebConsumerNum =
oldSz + taosArrayGetSize(pInput->pRebInfo->newConsumers) - taosArrayGetSize(pInput->pRebInfo->removedConsumers);
int32_t minVgCnt = 0; int32_t minVgCnt = 0;
int32_t imbConsumerNum = 0; int32_t imbConsumerNum = 0;
// calc num // calc num
...@@ -489,22 +478,34 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { ...@@ -489,22 +478,34 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
rebOutput.touchedConsumers = taosArrayInit(0, sizeof(void *)); rebOutput.touchedConsumers = taosArrayInit(0, sizeof(void *));
rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg)); rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter; SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter;
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key); SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key);
rebInput.pRebInfo = pRebInfo;
if (pSub == NULL) { if (pSub == NULL) {
// split sub key and extract topic // split sub key and extract topic
char topic[TSDB_TOPIC_FNAME_LEN]; char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pRebSub->key, topic, cgroup); mndSplitSubscribeKey(pRebInfo->key, topic, cgroup);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
ASSERT(pTopic); ASSERT(pTopic);
taosRLockLatch(&pTopic->lock); taosRLockLatch(&pTopic->lock);
rebInput.pTopic = pTopic;
}
rebInput.pRebInfo = pRebSub; rebOutput.pSub = mndCreateSub(pMnode, pTopic, pRebInfo->key);
rebInput.pOldSub = pSub; ASSERT(taosHashGetSize(rebOutput.pSub->consumerHash) == 0);
taosRUnLockLatch(&pTopic->lock);
mndReleaseTopic(pMnode, pTopic);
rebInput.oldConsumerNum = 0;
} else {
taosRLockLatch(&pSub->lock);
rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
rebOutput.pSub = tCloneSubscribeObj(pSub);
taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub);
}
// TODO replace assert with error check // TODO replace assert with error check
ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0); ASSERT(mndDoRebalance(pMnode, &rebInput, &rebOutput) == 0);
...@@ -517,14 +518,6 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) { ...@@ -517,14 +518,6 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) { if (mndPersistRebResult(pMnode, pMsg, &rebOutput) < 0) {
mError("persist rebalance output error, possibly vnode splitted or dropped"); mError("persist rebalance output error, possibly vnode splitted or dropped");
} }
if (rebInput.pTopic) {
SMqTopicObj *pTopic = (SMqTopicObj *)rebInput.pTopic;
taosRUnLockLatch(&pTopic->lock);
mndReleaseTopic(pMnode, pTopic);
} else {
mndReleaseSubscribe(pMnode, pSub);
}
} }
// reset flag // reset flag
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册