未验证 提交 c8694874 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #13324 from taosdata/feature/tq

feat(tmq): support drop cgroup
......@@ -144,7 +144,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_CGROUP, "mnode-drop-cgroup", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_ASK_EP, "mnode-mq-ask-ep", SMqAskEpReq, SMqAskEpRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, NULL)
......
......@@ -197,6 +197,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_SUBSCRIBE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_COMMIT_OFFSET, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_ASK_EP, mmPutNodeMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_MQ_DROP_CGROUP_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -78,6 +78,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP, mndProcessDropCgroupReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP_RSP, mndProcessSubscribeInternalRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
......@@ -597,7 +598,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
/*SSdb *pSdb = pMnode->pSdb;*/
SSdb *pSdb = pMnode->pSdb;
SMDropCgroupReq dropReq = {0};
if (tDeserializeSMDropCgroupReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
......@@ -617,15 +618,17 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
}
}
if (taosHashGetSize(pSub->consumerHash) == 0) {
if (taosHashGetSize(pSub->consumerHash) != 0) {
terrno = TSDB_CODE_MND_CGROUP_USED;
mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub);
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_CGROUP, pReq);
if (pTrans == NULL) {
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub);
return -1;
}
......@@ -633,14 +636,18 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
if (mndDropOffsetBySubKey(pMnode, pTrans, pSub->key) < 0) {
ASSERT(0);
mndReleaseSubscribe(pMnode, pSub);
return -1;
}
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
mndReleaseSubscribe(pMnode, pSub);
return -1;
}
mndTransPrepare(pMnode, pTrans);
mndReleaseSubscribe(pMnode, pSub);
return TSDB_CODE_ACTION_IN_PROGRESS;
......
......@@ -3381,7 +3381,7 @@ static int32_t translateDropCGroup(STranslateContext* pCxt, SDropCGroupStmt* pSt
dropReq.igNotExists = pStmt->ignoreNotExists;
strcpy(dropReq.cgroup, pStmt->cgroup);
return buildCmdMsg(pCxt, TDMT_MND_DROP_CGROUP, (FSerializeFunc)tSerializeSMDropCgroupReq, &dropReq);
return buildCmdMsg(pCxt, TDMT_MND_MQ_DROP_CGROUP, (FSerializeFunc)tSerializeSMDropCgroupReq, &dropReq);
}
static int32_t translateAlterLocal(STranslateContext* pCxt, SAlterLocalStmt* pStmt) {
......
......@@ -270,7 +270,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC, "Invalid topic")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_QUERY, "Topic with invalid query")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Topic with invalid option")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_EXIST, "Consumer not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer waiting for rebalance")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being used by some consumer")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_SUBSCRIBED, "Topic subscribed cannot be dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册