未验证 提交 2939e17a 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #13065 from taosdata/feature/stream

feat(tmq): drop cgroup
......@@ -1664,6 +1664,10 @@ typedef struct {
int32_t tSerializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq);
int32_t tDeserializeSMDropCgroupReq(void* buf, int32_t bufLen, SMDropCgroupReq* pReq);
typedef struct {
int8_t reserved;
} SMDropCgroupRsp;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t alterType;
......
......@@ -151,6 +151,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mnode-mq-consumer-lost", SMqConsumerLostMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_RECOVER, "mnode-mq-consumer-recover", SMqConsumerRecoverMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DROP_CGROUP, "mnode-mq-drop-cgroup", SMqDropCGroupReq, SMqDropCGroupRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_COMMIT_OFFSET, "mnode-mq-commit-offset", SMqCMCommitOffsetReq, SMqCMCommitOffsetRsp)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "mnode-create-stream", SCMCreateStreamReq, SCMCreateStreamRsp)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "mnode-alter-stream", NULL, NULL)
......
......@@ -268,6 +268,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9)
#define TSDB_CODE_MND_CONSUMER_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x03EA)
#define TSDB_CODE_MND_TOPIC_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x03EB)
#define TSDB_CODE_MND_CGROUP_USED TAOS_DEF_ERROR_CODE(0, 0x03EC)
// mnode-stream
#define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0)
......
......@@ -94,6 +94,7 @@ typedef enum {
TRN_TYPE_ALTER_STREAM = 1027,
TRN_TYPE_CONSUMER_LOST = 1028,
TRN_TYPE_CONSUMER_RECOVER = 1029,
TRN_TYPE_DROP_CGROUP = 1030,
TRN_TYPE_BASIC_SCOPE_END,
TRN_TYPE_GLOBAL_SCOPE = 2000,
......
......@@ -39,6 +39,7 @@ static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, c
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
int32_t mndDropOffsetBySubKey(SMnode *pMnode, STrans *pTrans, const char *subKey);
bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic);
......
......@@ -33,6 +33,7 @@ int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName
int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub);
#ifdef __cplusplus
}
......
......@@ -58,6 +58,12 @@ bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic) {
return false;
}
bool mndOffsetFromSubKey(SMqOffsetObj *pOffset, const char *subKey) {
int32_t i = 0;
while (pOffset->key[i] != ':') i++;
if (strcmp(&pOffset->key[i + 1], subKey) == 0) return true;
return false;
}
SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL;
......@@ -303,7 +309,35 @@ int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic)
continue;
}
if (mndSetDropOffsetRedoLogs(pMnode, pTrans, pOffset) < 0) {
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
sdbRelease(pSdb, pOffset);
goto END;
}
sdbRelease(pSdb, pOffset);
}
code = 0;
END:
return code;
}
int32_t mndDropOffsetBySubKey(SMnode *pMnode, STrans *pTrans, const char *subKey) {
int32_t code = -1;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqOffsetObj *pOffset = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pOffset);
if (pIter == NULL) break;
if (!mndOffsetFromSubKey(pOffset, subKey)) {
sdbRelease(pSdb, pOffset);
continue;
}
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
sdbRelease(pSdb, pOffset);
goto END;
}
......
......@@ -42,6 +42,7 @@ static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *);
static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub);
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg);
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SRpcMsg *pMsg);
static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
......@@ -75,6 +76,8 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_DELETE_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP, mndProcessDropCgroupReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
......@@ -581,6 +584,57 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
return 0;
}
static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
/*SSdb *pSdb = pMnode->pSdb;*/
SMDropCgroupReq dropReq = {0};
if (tDeserializeSMDropCgroupReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, dropReq.cgroup, dropReq.topic);
if (pSub == NULL) {
if (dropReq.igNotExists) {
mDebug("cgroup:%s on topic:%s, not exist, ignore not exist is set", dropReq.cgroup, dropReq.topic);
return 0;
} else {
terrno = TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST;
mError("topic:%s, cgroup:%s, failed to drop since %s", dropReq.topic, dropReq.cgroup, terrstr());
return -1;
}
}
if (taosHashGetSize(pSub->consumerHash) == 0) {
terrno = TSDB_CODE_MND_CGROUP_USED;
mError("cgroup:%s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_CGROUP, pReq);
if (pTrans == NULL) {
mError("cgroup: %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
return -1;
}
mDebug("trans:%d, used to drop cgroup:%s on topic %s", pTrans->id, dropReq.cgroup, dropReq.topic);
if (mndDropOffsetBySubKey(pMnode, pTrans, pSub->key) < 0) {
ASSERT(0);
return -1;
}
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
mError("cgroup %s on topic:%s, failed to drop since %s", dropReq.cgroup, dropReq.topic, terrstr());
return -1;
}
mndReleaseSubscribe(pMnode, pSub);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
void mndCleanupSubscribe(SMnode *pMnode) {}
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
......@@ -735,7 +789,7 @@ static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscrib
return 0;
}
static int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册