提交 a53c181b 编写于 作者: H Haojun Liao

fix(tmq): remove invalid assert

上级 5b25920f
...@@ -196,12 +196,14 @@ FAIL: ...@@ -196,12 +196,14 @@ FAIL:
static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SMqConsumerClearMsg *pClearMsg = pMsg->pCont; SMqConsumerClearMsg *pClearMsg = pMsg->pCont;
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
mError("consumer:0x%"PRIx64" failed to be found to clear it", pClearMsg->consumerId);
return 0; return 0;
} }
mInfo("receive consumer clear msg, consumer id %" PRId64 ", status %s", pClearMsg->consumerId, mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId,
mndConsumerStatusName(pConsumer->status)); mndConsumerStatusName(pConsumer->status));
if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) { if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) {
...@@ -216,6 +218,8 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { ...@@ -216,6 +218,8 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm");
if (pTrans == NULL) goto FAIL; if (pTrans == NULL) goto FAIL;
// this is the drop action, not the update action
if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL;
if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL;
...@@ -300,6 +304,11 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -300,6 +304,11 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
if (status == MQ_CONSUMER_STATUS__READY) { if (status == MQ_CONSUMER_STATUS__READY) {
if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg)); SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg));
if (pLostMsg == NULL) {
mError("consumer:0x%"PRIx64" failed to transfer consumer status to lost due to out of memory. alloc size:%d",
pConsumer->consumerId, sizeof(SMqConsumerLostMsg));
continue;
}
pLostMsg->consumerId = pConsumer->consumerId; pLostMsg->consumerId = pConsumer->consumerId;
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -313,6 +322,11 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { ...@@ -313,6 +322,11 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
// if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers. // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers.
if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) {
SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg)); SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg));
if (pClearMsg == NULL) {
mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d",
pConsumer->consumerId, sizeof(SMqConsumerClearMsg));
continue;
}
pClearMsg->consumerId = pConsumer->consumerId; pClearMsg->consumerId = pConsumer->consumerId;
SRpcMsg rpcMsg = { SRpcMsg rpcMsg = {
...@@ -871,7 +885,7 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) { ...@@ -871,7 +885,7 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) {
if (status == MQ_CONSUMER_STATUS_REBALANCE) { if (status == MQ_CONSUMER_STATUS_REBALANCE) {
pConsumer->status = MQ_CONSUMER_STATUS__READY; pConsumer->status = MQ_CONSUMER_STATUS__READY;
} else if (status == MQ_CONSUMER_STATUS__LOST) { } else if (status == MQ_CONSUMER_STATUS__LOST) {
ASSERT(taosArrayGetSize(pConsumer->currentTopics) == 0 && taosArrayGetSize(pConsumer->assignedTopics) == 0); ASSERT(taosArrayGetSize(pConsumer->currentTopics) == 0);
pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD; pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
} }
} }
......
...@@ -480,14 +480,16 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -480,14 +480,16 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
for (int32_t i = 0; i < vgNum; i++) { for (int32_t i = 0; i < vgNum; i++) {
SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i); SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) { if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) {
goto REB_FAIL; mndTransDrop(pTrans);
return -1;
} }
} }
// 2. redo log: subscribe and vg assignment // 2. redo log: subscribe and vg assignment
// subscribe // subscribe
if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) { if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) {
goto REB_FAIL; mndTransDrop(pTrans);
return -1;
} }
// 3. commit log: consumer to update status and epoch // 3. commit log: consumer to update status and epoch
...@@ -502,11 +504,15 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -502,11 +504,15 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew); taosMemoryFree(pConsumerNew);
goto REB_FAIL;
mndTransDrop(pTrans);
return -1;
} }
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew); taosMemoryFree(pConsumerNew);
} }
// 3.2 set new consumer // 3.2 set new consumer
consumerNum = taosArrayGetSize(pOutput->newConsumers); consumerNum = taosArrayGetSize(pOutput->newConsumers);
for (int32_t i = 0; i < consumerNum; i++) { for (int32_t i = 0; i < consumerNum; i++) {
...@@ -523,8 +529,11 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -523,8 +529,11 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew); taosMemoryFree(pConsumerNew);
goto REB_FAIL;
mndTransDrop(pTrans);
return -1;
} }
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew); taosMemoryFree(pConsumerNew);
} }
...@@ -545,8 +554,11 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -545,8 +554,11 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew); taosMemoryFree(pConsumerNew);
goto REB_FAIL;
mndTransDrop(pTrans);
return -1;
} }
tDeleteSMqConsumerObj(pConsumerNew); tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew); taosMemoryFree(pConsumerNew);
} }
...@@ -559,15 +571,12 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -559,15 +571,12 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
// 6. execution // 6. execution
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("failed to prepare trans rebalance since %s", terrstr()); mError("failed to prepare trans rebalance since %s", terrstr());
goto REB_FAIL; mndTransDrop(pTrans);
return -1;
} }
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return 0;
REB_FAIL:
mndTransDrop(pTrans);
return -1;
} }
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册