提交 44a96954 编写于 作者: L Liu Jicong

enh(tmq): cascade drop

上级 b3fea32a
...@@ -281,6 +281,7 @@ int32_t* taosGetErrno(); ...@@ -281,6 +281,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E8) #define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E8)
#define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9) #define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9)
#define TSDB_CODE_MND_CONSUMER_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x03EA) #define TSDB_CODE_MND_CONSUMER_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x03EA)
#define TSDB_CODE_MND_TOPIC_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x03EB)
// mnode-stream // mnode-stream
#define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0) #define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0)
......
...@@ -459,6 +459,7 @@ typedef struct { ...@@ -459,6 +459,7 @@ typedef struct {
char* ast; char* ast;
char* physicalPlan; char* physicalPlan;
SSchemaWrapper schema; SSchemaWrapper schema;
int32_t refConsumerCnt;
} SMqTopicObj; } SMqTopicObj;
typedef struct { typedef struct {
......
...@@ -38,6 +38,9 @@ static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, c ...@@ -38,6 +38,9 @@ static FORCE_INLINE int32_t mndMakePartitionKey(char *key, const char *cgroup, c
} }
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -32,6 +32,7 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub); ...@@ -32,6 +32,7 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName); int32_t mndMakeSubscribeKey(char *key, const char *cgroup, const char *topicName);
int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topic);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -35,6 +35,8 @@ int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); ...@@ -35,6 +35,8 @@ int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]); const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
int32_t mndSetTopicRedoLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -399,6 +399,9 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { ...@@ -399,6 +399,9 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
int32_t newTopicNum = taosArrayGetSize(newSub); int32_t newTopicNum = taosArrayGetSize(newSub);
// check topic existance // check topic existance
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
if (pTrans == NULL) goto SUBSCRIBE_OVER;
for (int32_t i = 0; i < newTopicNum; i++) { for (int32_t i = 0; i < newTopicNum; i++) {
char *topic = taosArrayGetP(newSub, i); char *topic = taosArrayGetP(newSub, i);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
...@@ -406,7 +409,14 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { ...@@ -406,7 +409,14 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
goto SUBSCRIBE_OVER; goto SUBSCRIBE_OVER;
} }
// TODO lock topic to prevent drop
// ref topic to prevent drop
// TODO make topic complete
SMqTopicObj topicObj = {0};
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1;
if (mndSetTopicRedoLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER;
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
} }
...@@ -422,8 +432,6 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { ...@@ -422,8 +432,6 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy); taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
} }
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
if (pTrans == NULL) goto SUBSCRIBE_OVER;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER; if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;
...@@ -494,8 +502,6 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { ...@@ -494,8 +502,6 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
goto SUBSCRIBE_OVER; goto SUBSCRIBE_OVER;
} }
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_SUBSCRIBE, &pMsg->rpcMsg);
if (pTrans == NULL) goto SUBSCRIBE_OVER;
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER; if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto SUBSCRIBE_OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto SUBSCRIBE_OVER;
} }
...@@ -503,6 +509,8 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) { ...@@ -503,6 +509,8 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS; code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
SUBSCRIBE_OVER: SUBSCRIBE_OVER:
mndTransDrop(pTrans);
if (pConsumerOld) { if (pConsumerOld) {
/*taosRUnLockLatch(&pConsumerOld->lock);*/ /*taosRUnLockLatch(&pConsumerOld->lock);*/
mndReleaseConsumer(pMnode, pConsumerOld); mndReleaseConsumer(pMnode, pConsumerOld);
......
...@@ -50,6 +50,14 @@ int32_t mndInitOffset(SMnode *pMnode) { ...@@ -50,6 +50,14 @@ int32_t mndInitOffset(SMnode *pMnode) {
void mndCleanupOffset(SMnode *pMnode) {} void mndCleanupOffset(SMnode *pMnode) {}
bool mndOffsetFromTopic(SMqOffsetObj *pOffset, const char *topic) {
int32_t i = 0;
while (pOffset->key[i] != ':') i++;
while (pOffset->key[i] != ':') i++;
if (strcmp(&pOffset->key[i + 1], topic) == 0) return true;
return false;
}
SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset) { SSdbRaw *mndOffsetActionEncode(SMqOffsetObj *pOffset) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
void *buf = NULL; void *buf = NULL;
...@@ -134,10 +142,11 @@ int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicNa ...@@ -134,10 +142,11 @@ int32_t mndCreateOffsets(STrans *pTrans, const char *cgroup, const char *topicNa
int32_t sz = taosArrayGetSize(vgs); int32_t sz = taosArrayGetSize(vgs);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int32_t vgId = *(int32_t *)taosArrayGet(vgs, i); int32_t vgId = *(int32_t *)taosArrayGet(vgs, i);
SMqOffsetObj offsetObj; SMqOffsetObj offsetObj = {0};
if (mndMakePartitionKey(offsetObj.key, cgroup, topicName, vgId) < 0) { if (mndMakePartitionKey(offsetObj.key, cgroup, topicName, vgId) < 0) {
return -1; return -1;
} }
// TODO assign db
offsetObj.offset = -1; offsetObj.offset = -1;
SSdbRaw *pOffsetRaw = mndOffsetActionEncode(&offsetObj); SSdbRaw *pOffsetRaw = mndOffsetActionEncode(&offsetObj);
if (pOffsetRaw == NULL) { if (pOffsetRaw == NULL) {
...@@ -240,6 +249,14 @@ static int32_t mndSetDropOffsetCommitLogs(SMnode *pMnode, STrans *pTrans, SMqOff ...@@ -240,6 +249,14 @@ static int32_t mndSetDropOffsetCommitLogs(SMnode *pMnode, STrans *pTrans, SMqOff
return 0; return 0;
} }
static int32_t mndSetDropOffsetRedoLogs(SMnode *pMnode, STrans *pTrans, SMqOffsetObj *pOffset) {
SSdbRaw *pRedoRaw = mndOffsetActionEncode(pOffset);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
}
int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
int32_t code = -1; int32_t code = -1;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
...@@ -247,7 +264,7 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { ...@@ -247,7 +264,7 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
void *pIter = NULL; void *pIter = NULL;
SMqOffsetObj *pOffset = NULL; SMqOffsetObj *pOffset = NULL;
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pOffset); pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pOffset);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pOffset->dbUid != pDb->uid) { if (pOffset->dbUid != pDb->uid) {
...@@ -256,8 +273,39 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { ...@@ -256,8 +273,39 @@ int32_t mndDropOffsetByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
} }
if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) { if (mndSetDropOffsetCommitLogs(pMnode, pTrans, pOffset) < 0) {
sdbRelease(pSdb, pOffset);
goto END;
}
sdbRelease(pSdb, pOffset);
}
code = 0;
END:
return code;
}
int32_t mndDropOffsetByTopic(SMnode *pMnode, STrans *pTrans, const char *topic) {
int32_t code = -1;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqOffsetObj *pOffset = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_OFFSET, pIter, (void **)&pOffset);
if (pIter == NULL) break;
if (!mndOffsetFromTopic(pOffset, topic)) {
sdbRelease(pSdb, pOffset);
continue;
}
if (mndSetDropOffsetRedoLogs(pMnode, pTrans, pOffset) < 0) {
sdbRelease(pSdb, pOffset);
goto END; goto END;
} }
sdbRelease(pSdb, pOffset);
} }
code = 0; code = 0;
......
...@@ -389,7 +389,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -389,7 +389,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
} }
static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebOutputObj *pOutput) { static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebOutputObj *pOutput) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_REBALANCE, &pMsg->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_REBALANCE, &pMsg->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {
return -1; return -1;
} }
...@@ -458,6 +458,20 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO ...@@ -458,6 +458,20 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SNodeMsg *pMsg, const SMqRebO
goto REB_FAIL; goto REB_FAIL;
} }
} }
if (consumerNum) {
char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pOutput->pSub->key, topic, cgroup, true);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
if (pTopic) {
// TODO make topic complete
SMqTopicObj topicObj = {0};
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
topicObj.refConsumerCnt = pTopic->refConsumerCnt - consumerNum;
if (mndSetTopicRedoLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL;
}
}
// 4. TODO commit log: modification log // 4. TODO commit log: modification log
// 5. set cb // 5. set cb
...@@ -688,6 +702,14 @@ static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) { ...@@ -688,6 +702,14 @@ static int32_t mndProcessSubscribeInternalRsp(SNodeMsg *pRsp) {
return 0; return 0;
} }
static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
}
static int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) { static int32_t mndSetDropSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
SSdbRaw *pCommitRaw = mndSubActionEncode(pSub); SSdbRaw *pCommitRaw = mndSubActionEncode(pSub);
if (pCommitRaw == NULL) return -1; if (pCommitRaw == NULL) return -1;
...@@ -712,6 +734,38 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { ...@@ -712,6 +734,38 @@ int32_t mndDropSubByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
} }
if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) { if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
sdbRelease(pSdb, pSub);
goto END;
}
}
code = 0;
END:
return code;
}
int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) {
int32_t code = -1;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqSubscribeObj *pSub = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, pIter, (void **)&pSub);
if (pIter == NULL) break;
char topic[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CGROUP_LEN];
mndSplitSubscribeKey(pSub->key, topic, cgroup, true);
if (strcmp(topic, topicName) != 0) {
sdbRelease(pSdb, pSub);
continue;
}
// iter all vnode to delete handle
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) {
sdbRelease(pSdb, pSub);
goto END; goto END;
} }
} }
......
...@@ -18,8 +18,10 @@ ...@@ -18,8 +18,10 @@
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
#include "mndSubscribe.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
...@@ -106,6 +108,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { ...@@ -106,6 +108,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema); taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, schemaLen, TOPIC_ENCODE_OVER);
SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER);
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER); SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
...@@ -190,6 +193,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { ...@@ -190,6 +193,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
goto TOPIC_DECODE_OVER; goto TOPIC_DECODE_OVER;
} }
SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER);
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
...@@ -220,11 +225,13 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic ...@@ -220,11 +225,13 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic
atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime); atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
atomic_exchange_32(&pOldTopic->version, pNewTopic->version); atomic_exchange_32(&pOldTopic->version, pNewTopic->version);
taosWLockLatch(&pOldTopic->lock); atomic_store_32(&pOldTopic->refConsumerCnt, pNewTopic->refConsumerCnt);
/*taosWLockLatch(&pOldTopic->lock);*/
// TODO handle update // TODO handle update
taosWUnLockLatch(&pOldTopic->lock); /*taosWUnLockLatch(&pOldTopic->lock);*/
return 0; return 0;
} }
...@@ -292,6 +299,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq ...@@ -292,6 +299,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
topicObj.version = 1; topicObj.version = 1;
topicObj.sql = strdup(pCreate->sql); topicObj.sql = strdup(pCreate->sql);
topicObj.sqlLen = strlen(pCreate->sql) + 1; topicObj.sqlLen = strlen(pCreate->sql) + 1;
topicObj.refConsumerCnt = 0;
if (pCreate->ast && pCreate->ast[0]) { if (pCreate->ast && pCreate->ast[0]) {
topicObj.ast = strdup(pCreate->ast); topicObj.ast = strdup(pCreate->ast);
...@@ -436,15 +444,7 @@ CREATE_TOPIC_OVER: ...@@ -436,15 +444,7 @@ CREATE_TOPIC_OVER:
return code; return code;
} }
static int32_t mndDropTopic(SMnode *pMnode, SNodeMsg *pReq, SMqTopicObj *pTopic) { static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SNodeMsg *pReq, SMqTopicObj *pTopic) {
// TODO: cannot drop when subscribed
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg);
if (pTrans == NULL) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
return -1;
}
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic); SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
...@@ -465,6 +465,7 @@ static int32_t mndDropTopic(SMnode *pMnode, SNodeMsg *pReq, SMqTopicObj *pTopic) ...@@ -465,6 +465,7 @@ static int32_t mndDropTopic(SMnode *pMnode, SNodeMsg *pReq, SMqTopicObj *pTopic)
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq) { static int32_t mndProcessDropTopicReq(SNodeMsg *pReq) {
SMnode *pMnode = pReq->pNode; SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb;
SMDropTopicReq dropReq = {0}; SMDropTopicReq dropReq = {0};
if (tDeserializeSMDropTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { if (tDeserializeSMDropTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
...@@ -485,10 +486,35 @@ static int32_t mndProcessDropTopicReq(SNodeMsg *pReq) { ...@@ -485,10 +486,35 @@ static int32_t mndProcessDropTopicReq(SNodeMsg *pReq) {
return -1; return -1;
} }
} }
// TODO: check ref
int32_t code = mndDropTopic(pMnode, pReq, pTopic); // check ref
// TODO: iterate and drop related subscriptions and offsets if (pTopic->refConsumerCnt != 0) {
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_TOPIC, &pReq->rpcMsg);
if (pTrans == NULL) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
return -1;
}
mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name);
#if 1
if (mndDropOffsetByTopic(pMnode, pTrans, dropReq.name) < 0) {
ASSERT(0);
return -1;
}
#endif
if (mndDropSubByTopic(pMnode, pTrans, dropReq.name) < 0) {
ASSERT(0);
return -1;
}
int32_t code = mndDropTopic(pMnode, pTrans, pReq, pTopic);
mndReleaseTopic(pMnode, pTopic); mndReleaseTopic(pMnode, pTopic);
if (code != 0) { if (code != 0) {
...@@ -577,6 +603,15 @@ static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -577,6 +603,15 @@ static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
return numOfRows; return numOfRows;
} }
int32_t mndSetTopicRedoLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
return 0;
}
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) { static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic) {
SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic); SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
if (pCommitRaw == NULL) return -1; if (pCommitRaw == NULL) return -1;
......
...@@ -285,6 +285,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_QUERY, "Topic with invalid qu ...@@ -285,6 +285,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_QUERY, "Topic with invalid qu
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Topic with invalid option") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Topic with invalid option")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_EXIST, "Consumer not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_EXIST, "Consumer not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer waiting for rebalance") TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer waiting for rebalance")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_SUBSCRIBED, "Topic subscribed cannot be dropped")
// mnode-sma // mnode-sma
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册