提交 4a17f4b9 编写于 作者: wmmhello's avatar wmmhello

fix:add rows to pSub if rebalance

上级 1fc66469
...@@ -1190,7 +1190,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * ...@@ -1190,7 +1190,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal); tFormatOffset(buf, TSDB_OFFSET_LEN, &pVal);
char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0}; char parasStr[64 + TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0};
sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%d,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf); sprintf(varDataVal(parasStr), "tbname:%d,commit:%d,interval:%dms,reset:%s", pConsumer->withTbName, pConsumer->autoCommit, pConsumer->autoCommitInterval, buf);
varDataSetLen(parasStr, strlen(varDataVal(parasStr))); varDataSetLen(parasStr, strlen(varDataVal(parasStr)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
......
...@@ -468,7 +468,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -468,7 +468,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
} }
} }
if(taosHashGetSize(pOutput->pSub->consumerHash) == 0) { // if all consumer is removed // if(taosHashGetSize(pOutput->pSub->consumerHash) == 0) { // if all consumer is removed
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows
if (pSub) { if (pSub) {
taosRLockLatch(&pSub->lock); taosRLockLatch(&pSub->lock);
...@@ -501,7 +501,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -501,7 +501,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
} }
taosRUnLockLatch(&pSub->lock); taosRUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
} // }
} }
// 8. generate logs // 8. generate logs
...@@ -771,8 +771,10 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { ...@@ -771,8 +771,10 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
} }
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
SMDropCgroupReq dropReq = {0}; SMDropCgroupReq dropReq = {0};
STrans *pTrans = NULL;
int32_t code = TSDB_CODE_ACTION_IN_PROGRESS;
if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { if (tDeserializeSMDropCgroupReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
...@@ -791,38 +793,40 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { ...@@ -791,38 +793,40 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) {
} }
} }
taosWLockLatch(&pSub->lock);
if (taosHashGetSize(pSub->consumerHash) != 0) { if (taosHashGetSize(pSub->consumerHash) != 0) {
terrno = TSDB_CODE_MND_CGROUP_USED; terrno = TSDB_CODE_MND_CGROUP_USED;
mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub); code = -1;
return -1; goto end;
} }
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup"); pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "drop-cgroup");
if (pTrans == NULL) { if (pTrans == NULL) {
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub); code = -1;
mndTransDrop(pTrans); goto end;
return -1;
} }
mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic); mInfo("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr()); mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub); code = -1;
mndTransDrop(pTrans); goto end;
return -1;
} }
if (mndTransPrepare(pMnode, pTrans) < 0) { if (mndTransPrepare(pMnode, pTrans) < 0) {
mndReleaseSubscribe(pMnode, pSub); code = -1;
mndTransDrop(pTrans); goto end;
return -1;
} }
end:
taosWUnLockLatch(&pSub->lock);
mndReleaseSubscribe(pMnode, pSub); mndReleaseSubscribe(pMnode, pSub);
mndTransDrop(pTrans);
return TSDB_CODE_ACTION_IN_PROGRESS; return code;
} }
void mndCleanupSubscribe(SMnode *pMnode) {} void mndCleanupSubscribe(SMnode *pMnode) {}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册