未验证 提交 10f08c94 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #20679 from taosdata/feature/3_liaohj

fix(query): allow only one trans to be execute for each balance, and do some other refactor.
...@@ -998,7 +998,8 @@ TEST(clientCase, sub_db_test) { ...@@ -998,7 +998,8 @@ TEST(clientCase, sub_db_test) {
// 创建订阅 topics 列表 // 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new(); tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topic_db1"); tmq_list_append(topicList, "topic_t1");
tmq_list_append(topicList, "topic_s2");
// 启动订阅 // 启动订阅
tmq_subscribe(tmq, topicList); tmq_subscribe(tmq, topicList);
......
...@@ -247,6 +247,13 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { ...@@ -247,6 +247,13 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
return pRebInfo; return pRebInfo;
} }
static void freeRebalanceItem(void* param) {
SMqRebInfo* pInfo = param;
taosArrayDestroy(pInfo->lostConsumers);
taosArrayDestroy(pInfo->newConsumers);
taosArrayDestroy(pInfo->removedConsumers);
}
static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
...@@ -262,8 +269,21 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -262,8 +269,21 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
} }
SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg)); SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
if (pRebMsg == NULL) {
mError("failed to create the rebalance msg, size:%d, quit mq timer", (int32_t) sizeof(SMqDoRebalanceMsg));
mndRebEnd();
return TSDB_CODE_OUT_OF_MEMORY;
}
pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
// TODO set cleanfp if (pRebMsg->rebSubHash == NULL) {
mError("failed to create rebalance hashmap");
rpcFreeCont(pRebMsg);
mndRebEnd();
return TSDB_CODE_OUT_OF_MEMORY;
}
taosHashSetFreeFp(pRebMsg->rebSubHash, freeRebalanceItem);
// iterate all consumers, find all modification // iterate all consumers, find all modification
while (1) { while (1) {
...@@ -356,7 +376,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -356,7 +376,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
} else { } else {
taosHashCleanup(pRebMsg->rebSubHash); taosHashCleanup(pRebMsg->rebSubHash);
rpcFreeCont(pRebMsg); rpcFreeCont(pRebMsg);
mDebug("mq rebalance finished, no modification"); mDebug("mq timer finished, no need to re-balance");
mndRebEnd(); mndRebEnd();
} }
return 0; return 0;
...@@ -852,6 +872,53 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) { ...@@ -852,6 +872,53 @@ static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
return 0; return 0;
} }
static void updateConsumerStatus(SMqConsumerObj* pConsumer) {
int32_t status = pConsumer->status;
if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pConsumer->status = MQ_CONSUMER_STATUS__READY;
} else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
}
} else {
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
} else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
pConsumer->status = MQ_CONSUMER_STATUS__LOST;
}
}
}
// remove from new topic
static void removeFromNewTopicList(SMqConsumerObj* pConsumer, const char* pTopic) {
int32_t size = taosArrayGetSize(pConsumer->rebNewTopics);
for (int32_t i = 0; i < taosArrayGetSize(pConsumer->rebNewTopics); i++) {
char *p = taosArrayGetP(pConsumer->rebNewTopics, i);
if (strcmp(pTopic, p) == 0) {
taosArrayRemove(pConsumer->rebNewTopics, i);
taosMemoryFree(p);
mDebug("consumer:0x%" PRIx64 " remove new topic:%s in the topic list, remain newTopics:%d", pConsumer->consumerId,
pTopic, (int) taosArrayGetSize(pConsumer->rebNewTopics));
break;
}
}
}
// remove from removed topic
static void removeFromRemoveTopicList(SMqConsumerObj* pConsumer, const char* pTopic) {
int32_t size = taosArrayGetSize(pConsumer->rebRemovedTopics);
for (int32_t i = 0; i < size; i++) {
char *p = taosArrayGetP(pConsumer->rebRemovedTopics, i);
if (strcmp(pTopic, p) == 0) {
taosArrayRemove(pConsumer->rebRemovedTopics, i);
taosMemoryFree(p);
break;
}
}
}
static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) { static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) {
mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", uptime:%" PRId64, mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", uptime:%" PRId64,
pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime); pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime);
...@@ -862,6 +929,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -862,6 +929,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
/*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/ /*A(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0);*/
/*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/ /*A(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0);*/
// this new consumer has identical topics with one existed consumers.
if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) { if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) {
pOldConsumer->status = MQ_CONSUMER_STATUS__READY; pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
} else { } else {
...@@ -878,7 +946,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -878,7 +946,6 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pNewConsumer->assignedTopics = tmp; pNewConsumer->assignedTopics = tmp;
pOldConsumer->subscribeTime = pNewConsumer->upTime; pOldConsumer->subscribeTime = pNewConsumer->upTime;
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY; pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
} }
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) {
...@@ -918,71 +985,48 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -918,71 +985,48 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer->rebalanceTime = pNewConsumer->upTime; pOldConsumer->rebalanceTime = pNewConsumer->upTime;
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) {
/*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1);*/ ASSERT(taosArrayGetSize(pNewConsumer->rebNewTopics) == 1 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);
/*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0);*/ char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
char *addedTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0));
// not exist in current topic // not exist in current topic
bool existing = false;
bool existing = false;
#if 1
int32_t numOfExistedTopics = taosArrayGetSize(pOldConsumer->currentTopics); int32_t numOfExistedTopics = taosArrayGetSize(pOldConsumer->currentTopics);
for (int32_t i = 0; i < numOfExistedTopics; i++) { for (int32_t i = 0; i < numOfExistedTopics; i++) {
char *topic = taosArrayGetP(pOldConsumer->currentTopics, i); char *topic = taosArrayGetP(pOldConsumer->currentTopics, i);
if (strcmp(topic, addedTopic) == 0) { if (strcmp(topic, pNewTopic) == 0) {
existing = true; existing = true;
} }
} }
#endif
// remove from new topic removeFromNewTopicList(pOldConsumer, pNewTopic);
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
if (strcmp(addedTopic, topic) == 0) {
taosArrayRemove(pOldConsumer->rebNewTopics, i);
taosMemoryFree(topic);
break;
}
}
// add to current topic // add to current topic
if (!existing) { if (!existing) {
taosArrayPush(pOldConsumer->currentTopics, &addedTopic); taosArrayPush(pOldConsumer->currentTopics, &pNewTopic);
taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString); taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString);
} else { } else {
taosMemoryFree(addedTopic); taosMemoryFree(pNewTopic);
} }
// set status // set status
int32_t status = pOldConsumer->status; int32_t status = pOldConsumer->status;
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) { updateConsumerStatus(pOldConsumer);
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
} else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
}
} else {
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
} else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
}
}
// the re-balance is triggered when the new consumer is launched. // the re-balance is triggered when the new consumer is launched.
pOldConsumer->rebalanceTime = pNewConsumer->upTime; pOldConsumer->rebalanceTime = pNewConsumer->upTime;
atomic_add_fetch_32(&pOldConsumer->epoch, 1); atomic_add_fetch_32(&pOldConsumer->epoch, 1);
mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64 ", current topics:%d", mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
", current topics:%d, newTopics:%d, removeTopics:%d",
pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status, pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
mndConsumerStatusName(pOldConsumer->status), mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics)); (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
(int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) { } else if (pNewConsumer->updateType == CONSUMER_UPDATE__REMOVE) {
/*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);*/ /*A(taosArrayGetSize(pNewConsumer->rebNewTopics) == 0);*/
/*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);*/ /*A(taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 1);*/
char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0); char *removedTopic = taosArrayGetP(pNewConsumer->rebRemovedTopics, 0);
// not exist in new topic
#if 0 #if 0
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) { for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebNewTopics); i++) {
char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i); char *topic = taosArrayGetP(pOldConsumer->rebNewTopics, i);
...@@ -991,14 +1035,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -991,14 +1035,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
#endif #endif
// remove from removed topic // remove from removed topic
for (int32_t i = 0; i < taosArrayGetSize(pOldConsumer->rebRemovedTopics); i++) { removeFromRemoveTopicList(pOldConsumer, removedTopic);
char *topic = taosArrayGetP(pOldConsumer->rebRemovedTopics, i);
if (strcmp(removedTopic, topic) == 0) {
taosArrayRemove(pOldConsumer->rebRemovedTopics, i);
taosMemoryFree(topic);
break;
}
}
// remove from current topic // remove from current topic
int32_t i = 0; int32_t i = 0;
...@@ -1011,32 +1048,20 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, ...@@ -1011,32 +1048,20 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
break; break;
} }
} }
// must find the topic
/*A(i < sz);*/
// set status // set status
int32_t status = pOldConsumer->status; int32_t status = pOldConsumer->status;
if (taosArrayGetSize(pOldConsumer->rebNewTopics) == 0 && taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0) { updateConsumerStatus(pOldConsumer);
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__READY;
} else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
}
} else {
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY_IN_REB;
} else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
pOldConsumer->status = MQ_CONSUMER_STATUS__LOST_IN_REB;
}
}
pOldConsumer->rebalanceTime = pNewConsumer->upTime; pOldConsumer->rebalanceTime = pNewConsumer->upTime;
atomic_add_fetch_32(&pOldConsumer->epoch, 1); atomic_add_fetch_32(&pOldConsumer->epoch, 1);
mDebug("consumer:0x%" PRIx64 " state %d(%s) -> %d(%s), new epoch:%d, reb-time:%" PRId64 ", current topics:%d", mDebug("consumer:0x%" PRIx64 " state (%d)%s -> (%d)%s, new epoch:%d, reb-time:%" PRId64
", current topics:%d, newTopics:%d, removeTopics:%d",
pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status, pOldConsumer->consumerId, status, mndConsumerStatusName(status), pOldConsumer->status,
mndConsumerStatusName(pOldConsumer->status), mndConsumerStatusName(pOldConsumer->status), pOldConsumer->epoch, pOldConsumer->rebalanceTime,
pOldConsumer->epoch, pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->currentTopics)); (int)taosArrayGetSize(pOldConsumer->currentTopics), (int)taosArrayGetSize(pOldConsumer->rebNewTopics),
(int)taosArrayGetSize(pOldConsumer->rebRemovedTopics));
} }
taosWUnLockLatch(&pOldConsumer->lock); taosWUnLockLatch(&pOldConsumer->lock);
......
...@@ -197,24 +197,20 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { ...@@ -197,24 +197,20 @@ static SMqRebInfo *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
return pRebSub; return pRebSub;
} }
static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) { static void doRemoveExistedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, const SMqRebInputObj *pInput) {
int32_t totalVgNum = pOutput->pSub->vgNum; int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
const char *sub = pOutput->pSub->key; const char *pSubKey = pOutput->pSub->key;
mInfo("sub:%s mq re-balance %d vgroups", sub, pOutput->pSub->vgNum);
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
// 2. check and get actual removed consumers, put their vg into hash
int32_t removedNum = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
int32_t actualRemoved = 0; int32_t actualRemoved = 0;
for (int32_t i = 0; i < removedNum; i++) { for (int32_t i = 0; i < numOfRemoved; i++) {
uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i); uint64_t consumerId = *(uint64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
// consumer exists till now
if (pConsumerEp) { if (pConsumerEp) {
actualRemoved++; actualRemoved++;
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
for (int32_t j = 0; j < consumerVgNum; j++) { for (int32_t j = 0; j < consumerVgNum; j++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
...@@ -223,52 +219,66 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -223,52 +219,66 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.newConsumerId = -1, .newConsumerId = -1,
.pVgEp = pVgEp, .pVgEp = pVgEp,
}; };
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:0x%" PRIx64, sub, pVgEp->vgId, consumerId); mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId);
} }
taosArrayDestroy(pConsumerEp->vgs); taosArrayDestroy(pConsumerEp->vgs);
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t)); taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
// put into removed // put into removed
taosArrayPush(pOutput->removedConsumers, &consumerId); taosArrayPush(pOutput->removedConsumers, &consumerId);
} }
} }
if (removedNum != actualRemoved) { if (numOfRemoved != actualRemoved) {
mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", sub, removedNum, actualRemoved); mError("sub:%s mq re-balance removedNum:%d not matched with actual:%d", pSubKey, numOfRemoved, actualRemoved);
} else {
mInfo("sub:%s removed %d consumers", pSubKey, numOfRemoved);
} }
}
// if previously no consumer, there are vgs not assigned static void doAddNewConsumers(SMqRebOutputObj *pOutput, const SMqRebInputObj *pInput) {
{ int32_t numOfNewConsumers = taosArrayGetSize(pInput->pRebInfo->newConsumers);
int32_t consumerVgNum = taosArrayGetSize(pOutput->pSub->unassignedVgs); const char *pSubKey = pOutput->pSub->key;
for (int32_t i = 0; i < consumerVgNum; i++) {
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs); for (int32_t i = 0; i < numOfNewConsumers; i++) {
SMqRebOutputVg rebOutput = { int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
.oldConsumerId = -1,
.newConsumerId = -1, SMqConsumerEp newConsumerEp;
.pVgEp = pVgEp, newConsumerEp.consumerId = consumerId;
}; newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq re-balance remove vgId:%d from unassigned", sub, pVgEp->vgId); taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
} taosArrayPush(pOutput->newConsumers, &consumerId);
mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, pSubKey, consumerId);
} }
}
// 3. calc vg number of each consumer static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj* pHash) {
int32_t afterRebConsumerNum = pInput->oldConsumerNum + taosArrayGetSize(pInput->pRebInfo->newConsumers) - const char *pSubKey = pOutput->pSub->key;
taosArrayGetSize(pInput->pRebInfo->removedConsumers); int32_t numOfVgroups = taosArrayGetSize(pOutput->pSub->unassignedVgs);
int32_t minVgCnt = 0;
int32_t imbConsumerNum = 0; for (int32_t i = 0; i < numOfVgroups; i++) {
// calc num SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs);
if (afterRebConsumerNum) { SMqRebOutputVg rebOutput = {
minVgCnt = totalVgNum / afterRebConsumerNum; .oldConsumerId = -1,
imbConsumerNum = totalVgNum % afterRebConsumerNum; .newConsumerId = -1,
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &rebOutput, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq re-balance remove vgId:%d from unassigned", pSubKey, pVgEp->vgId);
} }
}
mInfo("sub:%s mq re-balance %d consumers: at least %d vgs each, %d consumers has more vgs", sub, static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj* pHash, int32_t minVgCnt, int32_t imbConsumerNum) {
afterRebConsumerNum, minVgCnt, imbConsumerNum); const char *pSubKey = pOutput->pSub->key;
// 4. first scan: remove consumer more than wanted, put to remove hash
int32_t imbCnt = 0; int32_t imbCnt = 0;
void *pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) { if (pIter == NULL) {
...@@ -276,8 +286,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -276,8 +286,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
} }
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
// all old consumers still existing are touched // all old consumers still existing are touched
// TODO optimize: touch only consumer whose vgs changed // TODO optimize: touch only consumer whose vgs changed
taosArrayPush(pOutput->touchedConsumers, &pConsumerEp->consumerId); taosArrayPush(pOutput->touchedConsumers, &pConsumerEp->consumerId);
...@@ -296,13 +306,13 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -296,13 +306,13 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.pVgEp = pVgEp, .pVgEp = pVgEp,
}; };
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", sub, pVgEp->vgId, mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
pConsumerEp->consumerId); pConsumerEp->consumerId);
} }
imbCnt++; imbCnt++;
} }
} else { } else {
// pop until equal minVg // all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) { while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs); SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
SMqRebOutputVg outputVg = { SMqRebOutputVg outputVg = {
...@@ -311,36 +321,67 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -311,36 +321,67 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
.pVgEp = pVgEp, .pVgEp = pVgEp,
}; };
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", sub, pVgEp->vgId, mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
pConsumerEp->consumerId); pConsumerEp->consumerId);
} }
} }
} }
} }
}
// 5. add new consumer into sub static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqRebOutputObj *pOutput) {
{ int32_t totalVgNum = pOutput->pSub->vgNum;
int32_t consumerNum = taosArrayGetSize(pInput->pRebInfo->newConsumers); const char *pSubKey = pOutput->pSub->key;
for (int32_t i = 0; i < consumerNum; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i); int32_t numOfRemoved = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
int32_t numOfAdded = taosArrayGetSize(pInput->pRebInfo->newConsumers);
SMqConsumerEp newConsumerEp; mInfo("sub:%s mq re-balance %d vgroups, existed consumers:%d, added:%d, removed:%d", pSubKey, totalVgNum,
newConsumerEp.consumerId = consumerId; pInput->oldConsumerNum, numOfAdded, numOfRemoved);
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp)); // 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
taosArrayPush(pOutput->newConsumers, &consumerId); SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
mInfo("sub:%s mq rebalance add new consumer:0x%" PRIx64, sub, consumerId);
} // 2. check and get actual removed consumers, put their vg into hash
doRemoveExistedConsumers(pOutput, pHash, pInput);
// 3. if previously no consumer, there are vgs not assigned
addUnassignedVgroups(pOutput, pHash);
// 4. calc vg number of each consumer
int32_t numOfFinal = pInput->oldConsumerNum + numOfAdded - numOfRemoved;
int32_t minVgCnt = 0;
int32_t imbConsumerNum = 0;
// calc num
if (numOfFinal) {
minVgCnt = totalVgNum / numOfFinal;
imbConsumerNum = totalVgNum % numOfFinal;
mInfo("sub:%s mq re-balance %d consumers: at least %d vgs each, %d consumers has 1 more vgroups than avg value",
pSubKey, numOfFinal, minVgCnt, imbConsumerNum);
} else {
mInfo("sub:%s no consumer subscribe this topic", pSubKey);
} }
// 6. second scan: find consumer do not have enough vg, extract from temporary hash and assign to new consumer. // 5. first scan: remove vgroups from te consumers, who have more vgroups than the threashold value that is
// minVgCnt, and then put them into the recycled hash list
transferVgroupsForConsumers(pOutput, pHash, minVgCnt, imbConsumerNum);
// 6. add new consumer into sub
doAddNewConsumers(pOutput, pInput);
// 7. second scan: find consumer do not have enough vgroups, extract from temporary hash and assign to them
// All related vg should be put into rebVgs // All related vg should be put into rebVgs
SMqRebOutputVg *pRebVg = NULL; SMqRebOutputVg *pRebVg = NULL;
void *pRemovedIter = NULL; void *pRemovedIter = NULL;
pIter = NULL; void *pIter = NULL;
while (1) { while (1) {
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter); pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
if (pIter == NULL) break; if (pIter == NULL) {
break;
}
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
// push until equal minVg // push until equal minVg
...@@ -348,8 +389,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -348,8 +389,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
// iter hash and find one vg // iter hash and find one vg
pRemovedIter = taosHashIterate(pHash, pRemovedIter); pRemovedIter = taosHashIterate(pHash, pRemovedIter);
if (pRemovedIter == NULL) { if (pRemovedIter == NULL) {
mError("sub:%s removed iter is null", sub); mError("sub:%s removed iter is null", pSubKey);
continue; break;
} }
pRebVg = (SMqRebOutputVg *)pRemovedIter; pRebVg = (SMqRebOutputVg *)pRemovedIter;
...@@ -409,15 +450,15 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -409,15 +450,15 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
taosArrayPush(pOutput->rebVgs, pRebOutput); taosArrayPush(pOutput->rebVgs, pRebOutput);
mInfo("sub:%s mq re-balance unassign vgId:%d (second scan)", sub, pRebOutput->pVgEp->vgId); mInfo("sub:%s mq re-balance unassign vgId:%d (second scan)", pSubKey, pRebOutput->pVgEp->vgId);
} }
} }
// 8. generate logs // 8. generate logs
mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", sub); mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", pSubKey);
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) { for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i); SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, sub, mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pSubKey,
pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId); pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
} }
{ {
...@@ -427,10 +468,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -427,10 +468,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
if (pIter == NULL) break; if (pIter == NULL) break;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
int32_t sz = taosArrayGetSize(pConsumerEp->vgs); int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", sub, pConsumerEp->consumerId, sz); mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pSubKey, pConsumerEp->consumerId, sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, sub, pVgEp->vgId, mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pSubKey, pVgEp->vgId,
pConsumerEp->consumerId); pConsumerEp->consumerId);
} }
} }
...@@ -555,17 +596,23 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { ...@@ -555,17 +596,23 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SMqDoRebalanceMsg *pReq = pMsg->pCont; SMqDoRebalanceMsg *pReq = pMsg->pCont;
void *pIter = NULL; void *pIter = NULL;
bool rebalanceOnce = false; // to ensure only once.
mInfo("mq re-balance start"); mInfo("mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(pReq->rebSubHash));
// here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction.
while (1) { while (1) {
if (rebalanceOnce) {
break;
}
pIter = taosHashIterate(pReq->rebSubHash, pIter); pIter = taosHashIterate(pReq->rebSubHash, pIter);
if (pIter == NULL) { if (pIter == NULL) {
break; break;
} }
// todo handle the malloc failure
SMqRebInputObj rebInput = {0}; SMqRebInputObj rebInput = {0};
SMqRebOutputObj rebOutput = {0}; SMqRebOutputObj rebOutput = {0};
rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t));
rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.removedConsumers = taosArrayInit(0, sizeof(int64_t));
...@@ -582,9 +629,10 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { ...@@ -582,9 +629,10 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
char topic[TSDB_TOPIC_FNAME_LEN]; char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN]; char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true); mndSplitSubscribeKey(pRebInfo->key, topic, cgroup, true);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if (pTopic == NULL) { if (pTopic == NULL) {
mError("mq re-balance %s ignored since topic %s not exist", pRebInfo->key, topic); mError("mq re-balance %s ignored since topic %s doesn't exist", pRebInfo->key, topic);
continue; continue;
} }
...@@ -604,11 +652,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { ...@@ -604,11 +652,13 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
rebInput.oldConsumerNum = 0; rebInput.oldConsumerNum = 0;
mInfo("topic:%s has no consumers sub yet", topic);
} else { } else {
taosRLockLatch(&pSub->lock); taosRLockLatch(&pSub->lock);
rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash); rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash);
rebOutput.pSub = tCloneSubscribeObj(pSub); rebOutput.pSub = tCloneSubscribeObj(pSub);
taosRUnLockLatch(&pSub->lock); taosRUnLockLatch(&pSub->lock);
mInfo("topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum);
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
} }
...@@ -623,16 +673,14 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { ...@@ -623,16 +673,14 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
mError("mq re-balance persist output error, possibly vnode splitted or dropped"); mError("mq re-balance persist output error, possibly vnode splitted or dropped");
} }
taosArrayDestroy(pRebInfo->lostConsumers);
taosArrayDestroy(pRebInfo->newConsumers);
taosArrayDestroy(pRebInfo->removedConsumers);
taosArrayDestroy(rebOutput.newConsumers); taosArrayDestroy(rebOutput.newConsumers);
taosArrayDestroy(rebOutput.touchedConsumers); taosArrayDestroy(rebOutput.touchedConsumers);
taosArrayDestroy(rebOutput.removedConsumers); taosArrayDestroy(rebOutput.removedConsumers);
taosArrayDestroy(rebOutput.rebVgs); taosArrayDestroy(rebOutput.rebVgs);
tDeleteSubscribeObj(rebOutput.pSub); tDeleteSubscribeObj(rebOutput.pSub);
taosMemoryFree(rebOutput.pSub); taosMemoryFree(rebOutput.pSub);
rebalanceOnce = true;
} }
// reset flag // reset flag
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册