提交 779e35b6 编写于 作者: wmmhello's avatar wmmhello

fix:drop consumer wher drop topic or group

上级 e019ed2c
......@@ -33,6 +33,7 @@ enum {
int32_t mndInitConsumer(SMnode *pMnode);
void mndCleanupConsumer(SMnode *pMnode);
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId);
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId);
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
......
......@@ -75,6 +75,22 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void mndCleanupConsumer(SMnode *pMnode) {}
void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
if (pClearMsg == NULL) {
mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", consumerId, (int32_t)sizeof(SMqConsumerClearMsg));
return;
}
pClearMsg->consumerId = consumerId;
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)};
mInfo("consumer:0x%" PRIx64 " drop from sdb", consumerId);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
return;
}
bool mndRebTryStart() {
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
mDebug("tq timer, rebalance counter old val:%d", old);
......@@ -330,20 +346,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
} else if (status == MQ_CONSUMER_STATUS_LOST) {
// if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers.
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD || taosArrayGetSize(pConsumer->assignedTopics) == 0) { // clear consumer if lost a day or unsubscribe/close
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
if (pClearMsg == NULL) {
mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d",
pConsumer->consumerId, (int32_t)sizeof(SMqConsumerClearMsg));
continue;
}
pClearMsg->consumerId = pConsumer->consumerId;
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)};
mDebug("consumer:0x%" PRIx64 " lost beyond threshold %d, clear it", pConsumer->consumerId,
MND_CONSUMER_LOST_CLEAR_THRESHOLD);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
}
} else { // MQ_CONSUMER_STATUS_REBALANCE
taosRLockLatch(&pConsumer->lock);
......
......@@ -760,10 +760,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
}
if (strcmp(dropReq.cgroup, pConsumer->cgroup) == 0) {
sdbRelease(pMnode->pSdb, pConsumer);
terrno = TSDB_CODE_MND_CGROUP_USED;
mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
return -1;
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
}
sdbRelease(pMnode->pSdb, pConsumer);
}
......
......@@ -697,6 +697,11 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
break;
}
if (pConsumer->status == MQ_CONSUMER_STATUS_LOST){
mndDropConsumerFromSdb(pMnode, pConsumer->consumerId);
continue;
}
int32_t sz = taosArrayGetSize(pConsumer->assignedTopics);
for (int32_t i = 0; i < sz; i++) {
char *name = taosArrayGetP(pConsumer->assignedTopics, i);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册