未验证 提交 f4b695c8 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #15945 from taosdata/feature/stream

enh(tmq): prevent drop db when topic exists
rust @ 7ed7a977
Subproject commit 7ed7a97715388fa144718764d6bf20f9bfc29a12
......@@ -284,11 +284,13 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_CONSUMER_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x03EA)
#define TSDB_CODE_MND_TOPIC_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x03EB)
#define TSDB_CODE_MND_CGROUP_USED TAOS_DEF_ERROR_CODE(0, 0x03EC)
#define TSDB_CODE_MND_TOPIC_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03ED)
// mnode-stream
#define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0)
#define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1)
#define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2)
#define TSDB_CODE_MND_STREAM_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03F3)
// mnode-sma
#define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480)
......
......@@ -204,7 +204,7 @@ typedef struct {
typedef struct {
SMqCommitCbParamSet* params;
STqOffset* pOffset;
} SMqCommitCbParam2;
} SMqCommitCbParam;
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
......@@ -368,8 +368,8 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
return sprintf(dst, "%s:%d", topicName, vg);
}
int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param;
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
// push into array
#if 0
......@@ -440,7 +440,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
tEncodeSTqOffset(&encoder, pOffset);
// build param
SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
pParam->params = pParamSet;
pParam->pOffset = pOffset;
......@@ -465,7 +465,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
pMsgSendInfo->requestObjRefId = 0;
pMsgSendInfo->param = pParam;
pMsgSendInfo->paramFreeFp = taosMemoryFree;
pMsgSendInfo->fp = tmqCommitCb2;
pMsgSendInfo->fp = tmqCommitCb;
pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
// send msg
......@@ -1313,17 +1313,6 @@ int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
#endif
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
/*int64_t reqOffset;*/
/*if (pVg->currentOffset >= 0) {*/
/*reqOffset = pVg->currentOffset;*/
/*} else {*/
/*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/
/*tscError("unable to poll since no committed offset but reset offset is set to none");*/
/*return NULL;*/
/*}*/
/*reqOffset = tmq->resetOffsetCfg;*/
/*}*/
SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
if (pReq == NULL) {
return NULL;
......
......@@ -32,6 +32,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic);
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw);
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb);
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
......
......@@ -995,11 +995,13 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndCheckTopicExist(pMnode, pDb) < 0) goto _OVER;
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;
/*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
/*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
/*if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
if (mndDropStreamByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndDropSmasByDb(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER;
......@@ -1706,7 +1708,7 @@ static void setPerfSchemaDbCfg(SDbObj *pDbObj) {
static bool mndGetTablesOfDbFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
SVgObj *pVgroup = pObj;
int32_t *numOfTables = p1;
int64_t uid = *(int64_t*)p2;
int64_t uid = *(int64_t *)p2;
if (pVgroup->dbUid == uid) {
*numOfTables += pVgroup->numOfTables;
}
......
......@@ -837,7 +837,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
sdbCancelFetch(pSdb, pIter);
mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64,
pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid);
terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST;
terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
return -1;
} else {
#if 0
......
......@@ -782,6 +782,26 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
sdbCancelFetch(pSdb, pIter);
}
int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMqTopicObj *pTopic = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
if (pIter == NULL) break;
if (pTopic->dbUid == pDb->uid) {
sdbRelease(pSdb, pTopic);
terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
return -1;
}
sdbRelease(pSdb, pTopic);
}
return 0;
}
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
......
......@@ -286,6 +286,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST, "Subcribe not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_OFFSET_NOT_EXIST, "Offset not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_SUBSCRIBED, "Topic subscribed cannot be dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_MUST_BE_DELETED, "Topic must be dropped first")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being used by some consumer")
// mnode-stream
......
taos-tools @ 3c7dafee
Subproject commit 3c7dafeea3e558968165b73bee0f51024898e3da
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册