diff --git a/examples/rust b/examples/rust deleted file mode 160000 index 7ed7a97715388fa144718764d6bf20f9bfc29a12..0000000000000000000000000000000000000000 --- a/examples/rust +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 7ed7a97715388fa144718764d6bf20f9bfc29a12 diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ce73dceec22a51e869f331c918e8775fdbf357b1..3ca6978156bb99d40245bd89c09981786c3b8d46 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -284,11 +284,13 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_CONSUMER_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x03EA) #define TSDB_CODE_MND_TOPIC_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x03EB) #define TSDB_CODE_MND_CGROUP_USED TAOS_DEF_ERROR_CODE(0, 0x03EC) +#define TSDB_CODE_MND_TOPIC_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03ED) // mnode-stream #define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0) #define TSDB_CODE_MND_STREAM_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F1) #define TSDB_CODE_MND_INVALID_STREAM_OPTION TAOS_DEF_ERROR_CODE(0, 0x03F2) +#define TSDB_CODE_MND_STREAM_MUST_BE_DELETED TAOS_DEF_ERROR_CODE(0, 0x03F3) // mnode-sma #define TSDB_CODE_MND_SMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0480) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index c4375c85b4fc7eaf774fdf3bb4aa1ffde11cfa06..c0874caf950de1bbcfadef4ce0c5bc9409450dc3 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -204,7 +204,7 @@ typedef struct { typedef struct { SMqCommitCbParamSet* params; STqOffset* pOffset; -} SMqCommitCbParam2; +} SMqCommitCbParam; tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); @@ -368,8 +368,8 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { return sprintf(dst, "%s:%d", topicName, vg); } -int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) { - SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param; +int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { + SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; // push into array #if 0 @@ -440,7 +440,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT tEncodeSTqOffset(&encoder, pOffset); // build param - SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2)); + SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam)); pParam->params = pParamSet; pParam->pOffset = pOffset; @@ -465,7 +465,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT pMsgSendInfo->requestObjRefId = 0; pMsgSendInfo->param = pParam; pMsgSendInfo->paramFreeFp = taosMemoryFree; - pMsgSendInfo->fp = tmqCommitCb2; + pMsgSendInfo->fp = tmqCommitCb; pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET; // send msg @@ -1313,17 +1313,6 @@ int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { #endif SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { - /*int64_t reqOffset;*/ - /*if (pVg->currentOffset >= 0) {*/ - /*reqOffset = pVg->currentOffset;*/ - /*} else {*/ - /*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/ - /*tscError("unable to poll since no committed offset but reset offset is set to none");*/ - /*return NULL;*/ - /*}*/ - /*reqOffset = tmq->resetOffsetCfg;*/ - /*}*/ - SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq)); if (pReq == NULL) { return NULL; diff --git a/source/dnode/mnode/impl/inc/mndTopic.h b/source/dnode/mnode/impl/inc/mndTopic.h index c5c4800e0295fa48ee4bf9669200f7ce7a31eff8..9f516904ce816a6c7999d44ed003d064f36e95c5 100644 --- a/source/dnode/mnode/impl/inc/mndTopic.h +++ b/source/dnode/mnode/impl/inc/mndTopic.h @@ -32,6 +32,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic); SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw); int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb); +int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb); const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 89ea7abc23be102210e36450302d5aa30c49e6a6..9dde083f5054c6557f7dd85049133f44c46522f6 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -995,11 +995,13 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) { mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name); mndTransSetDbName(pTrans, pDb->name, NULL); + if (mndCheckTopicExist(pMnode, pDb) < 0) goto _OVER; + if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER; - if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER; - if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER; - if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER; + /*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ + /*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ + /*if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/ if (mndDropStreamByDb(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndDropSmasByDb(pMnode, pTrans, pDb) != 0) goto _OVER; if (mndSetDropDbRedoActions(pMnode, pTrans, pDb) != 0) goto _OVER; @@ -1706,7 +1708,7 @@ static void setPerfSchemaDbCfg(SDbObj *pDbObj) { static bool mndGetTablesOfDbFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) { SVgObj *pVgroup = pObj; int32_t *numOfTables = p1; - int64_t uid = *(int64_t*)p2; + int64_t uid = *(int64_t *)p2; if (pVgroup->dbUid == uid) { *numOfTables += pVgroup->numOfTables; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 7bde05aca5047195ed02a88ccb01e849daed2646..0f20188033b3f97c850d5a9f0075fc0fa8c58e6d 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -837,7 +837,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { sdbCancelFetch(pSdb, pIter); mError("db:%s, failed to drop stream:%s since sourceDbUid:%" PRId64 " not match with targetDbUid:%" PRId64, pDb->name, pStream->name, pStream->sourceDbUid, pStream->targetDbUid); - terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST; + terrno = TSDB_CODE_MND_STREAM_MUST_BE_DELETED; return -1; } else { #if 0 diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index f343c2e055ed3f357a6a9cee6e3e1c8025d055cc..820bb4b636bf5ecdebdb55ead48eb9c7ee5aeca5 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -782,6 +782,26 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) { sdbCancelFetch(pSdb, pIter); } +int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb) { + SSdb *pSdb = pMnode->pSdb; + + void *pIter = NULL; + SMqTopicObj *pTopic = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); + if (pIter == NULL) break; + + if (pTopic->dbUid == pDb->uid) { + sdbRelease(pSdb, pTopic); + terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED; + return -1; + } + + sdbRelease(pSdb, pTopic); + } + return 0; +} + int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { int32_t code = 0; SSdb *pSdb = pMnode->pSdb; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index f0a58c9bdc34cd307f499749ed36babfd95c6b0b..bb26f9b2f7fe8f69e7f4d7454e414abfcd6cccfb 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -286,6 +286,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST, "Subcribe not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_OFFSET_NOT_EXIST, "Offset not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer not ready") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_SUBSCRIBED, "Topic subscribed cannot be dropped") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_MUST_BE_DELETED, "Topic must be dropped first") TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being used by some consumer") // mnode-stream diff --git a/tools/taos-tools b/tools/taos-tools deleted file mode 160000 index 3c7dafeea3e558968165b73bee0f51024898e3da..0000000000000000000000000000000000000000 --- a/tools/taos-tools +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 3c7dafeea3e558968165b73bee0f51024898e3da