提交 5b25920f 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 79342cf1
...@@ -672,7 +672,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -672,7 +672,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
} else { } else {
/*taosRLockLatch(&pExistedConsumer->lock);*/
int32_t status = atomic_load_32(&pExistedConsumer->status); int32_t status = atomic_load_32(&pExistedConsumer->status);
mInfo("receive subscribe request from existed consumer:0x%" PRIx64 mInfo("receive subscribe request from existed consumer:0x%" PRIx64
...@@ -881,7 +880,7 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) { ...@@ -881,7 +880,7 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
// remove from new topic // remove from new topic
static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
int32_t size = taosArrayGetSize(pConsumer->rebNewTopics); int32_t size = taosArrayGetSize(pConsumer->rebNewTopics);
for (int32_t i = 0; i < taosArrayGetSize(pConsumer->rebNewTopics); i++) { for (int32_t i = 0; i < size; i++) {
char *p = taosArrayGetP(pConsumer->rebNewTopics, i); char *p = taosArrayGetP(pConsumer->rebNewTopics, i);
if (strcmp(pTopic, p) == 0) { if (strcmp(pTopic, p) == 0) {
taosArrayRemove(pConsumer->rebNewTopics, i); taosArrayRemove(pConsumer->rebNewTopics, i);
...@@ -902,9 +901,42 @@ static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTo ...@@ -902,9 +901,42 @@ static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTo
if (strcmp(pTopic, p) == 0) { if (strcmp(pTopic, p) == 0) {
taosArrayRemove(pConsumer->rebRemovedTopics, i); taosArrayRemove(pConsumer->rebRemovedTopics, i);
taosMemoryFree(p); taosMemoryFree(p);
mDebug("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d",
pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics));
break;
}
}
}
static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pTopic) {
int32_t sz = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
if (strcmp(pTopic, topic) == 0) {
taosArrayRemove(pConsumer->currentTopics, i);
taosMemoryFree(topic);
mDebug("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d",
pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics));
break;
}
}
}
static bool existInCurrentTopicList(const SMqConsumerObj* pConsumer, const char* pTopic) {
bool existing = false;
int32_t size = taosArrayGetSize(pConsumer->currentTopics);
for (int32_t i = 0; i < size; i++) {
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
if (strcmp(topic, pTopic) == 0) {
existing = true;
break; break;
} }
} }
return existing;
} }
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) { static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
...@@ -951,24 +983,16 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -951,24 +983,16 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0)); char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
// not exist in current topic // check if exist in current topic
bool existing = false;
int32_t numOfExistedTopics = taosArrayGetSize(pOldConsumer->currentTopics);
for (int32_t i = 0; i < numOfExistedTopics; i++) {
char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
if (strcmp(topic, pNewTopic) == 0) {
existing = true;
}
}
removeFromNewTopicList(pOldConsumer, pNewTopic); removeFromNewTopicList(pOldConsumer, pNewTopic);
// add to current topic // add to current topic
if (!existing) { bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic);
if (existing) {
taosMemoryFree(pNewTopic);
} else { // added into current topic list
taosArrayPush(pOldConsumer->currentTopics, &pNewTopic); taosArrayPush(pOldConsumer->currentTopics, &pNewTopic);
taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString); taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
} else {
taosMemoryFree(pNewTopic);
} }
// set status // set status
...@@ -993,16 +1017,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -993,16 +1017,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
removeFromRemoveTopicList(pOldConsumer, removedTopic); removeFromRemoveTopicList(pOldConsumer, removedTopic);
// remove from current topic // remove from current topic
int32_t i = 0; removeFromCurrentTopicList(pOldConsumer, removedTopic);
int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics);
for (i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
if (strcmp(removedTopic, topic) == 0) {
taosArrayRemove(pOldConsumer->currentTopics, i);
taosMemoryFree(topic);
break;
}
}
// set status // set status
int32_t status = pOldConsumer->status; int32_t status = pOldConsumer->status;
......
...@@ -213,13 +213,9 @@ static void doRemoveExistedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, ...@@ -213,13 +213,9 @@ static void doRemoveExistedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash,
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
for (int32_t j = 0; j < consumerVgNum; j++) { for (int32_t j = 0; j < consumerVgNum; j++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
SMqRebOutputVg outputVg = {
.oldConsumerId = consumerId,
.newConsumerId = -1,
.pVgEp = pVgEp,
};
SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId); mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId);
} }
...@@ -584,16 +580,11 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { ...@@ -584,16 +580,11 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
// here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction. // here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction.
while (1) { while (1) {
// if (rebalanceOnce) {
// break;
// }
pIter = taosHashIterate(pReq->rebSubHash, pIter); pIter = taosHashIterate(pReq->rebSubHash, pIter);
if (pIter == NULL) { if (pIter == NULL) {
break; break;
} }
// todo handle the malloc failure
SMqRebInputObj rebInput = {0}; SMqRebInputObj rebInput = {0};
SMqRebOutputObj rebOutput = {0}; SMqRebOutputObj rebOutput = {0};
rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));
...@@ -601,6 +592,20 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { ...@@ -601,6 +592,20 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t));
rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg)); rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg));
if (rebOutput.newConsumers == NULL || rebOutput.removedConsumers == NULL || rebOutput.modifyConsumers == NULL ||
rebOutput.rebVgs == NULL) {
taosArrayDestroy(rebOutput.newConsumers);
taosArrayDestroy(rebOutput.removedConsumers);
taosArrayDestroy(rebOutput.modifyConsumers);
taosArrayDestroy(rebOutput.rebVgs);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mInfo("mq re-balance failed, due to out of memory");
taosHashCleanup(pReq->rebSubHash);
mndRebEnd();
return -1;
}
SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter; SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter;
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key); SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key);
...@@ -640,6 +645,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { ...@@ -640,6 +645,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash); rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
rebOutput.pSub = tCloneSubscribeObj(pSub); rebOutput.pSub = tCloneSubscribeObj(pSub);
taosRUnLockLatch(&pSub->lock); taosRUnLockLatch(&pSub->lock);
mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum); mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum);
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
} }
...@@ -661,9 +667,6 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { ...@@ -661,9 +667,6 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
taosArrayDestroy(rebOutput.rebVgs); taosArrayDestroy(rebOutput.rebVgs);
tDeleteSubscribeObj(rebOutput.pSub); tDeleteSubscribeObj(rebOutput.pSub);
taosMemoryFree(rebOutput.pSub); taosMemoryFree(rebOutput.pSub);
// taosSsleep(100);
// rebalanceOnce = true;
} }
// reset flag // reset flag
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册