diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 035a767b87fd3e91bfa7d83e473db167c68b0d6f..1d4e0489f6ef21be43e1f3d952d98ec8789e2cf9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1083,15 +1083,16 @@ typedef struct { } STaskDropRsp; typedef struct { - int8_t igExists; - char* name; - char* sql; - char* physicalPlan; - char* logicalPlan; + char name[TSDB_TOPIC_FNAME_LEN]; + int8_t igExists; + char* sql; + char* physicalPlan; + char* logicalPlan; } SMCreateTopicReq; -int32_t tSerializeMCreateTopicReq(void** buf, const SMCreateTopicReq* pReq); -void* tDeserializeSMCreateTopicReq(void* buf, SMCreateTopicReq* pReq); +int32_t tSerializeMCreateTopicReq(void* buf, int32_t bufLen, const SMCreateTopicReq* pReq); +int32_t tDeserializeSMCreateTopicReq(void* buf, int32_t bufLen, SMCreateTopicReq* pReq); +void tFreeSMCreateTopicReq(SMCreateTopicReq* pReq); typedef struct { int64_t topicId; @@ -1262,7 +1263,7 @@ typedef struct { int8_t igNotExists; } SMDropTopicReq; -int32_t tSerializeSMDropTopicReqq(void* buf, int32_t bufLen, SMDropTopicReq* pReq); +int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq); int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq); typedef struct { diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 96872c53d57dc532aedbe842cf9523e050e2f950..b54a57373eba2ac524962112e8293bb5edd0119e 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -361,21 +361,20 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i tNameExtractFullName(&name, topicFname); SMCreateTopicReq req = { - .name = (char*)topicFname, .igExists = 1, .physicalPlan = (char*)pStr, .sql = (char*)sql, .logicalPlan = (char*)"no logic plan", }; + memcpy(req.name, topicName, TSDB_TOPIC_FNAME_LEN); - int tlen = tSerializeMCreateTopicReq(NULL, &req); + int tlen = tSerializeMCreateTopicReq(NULL, 0, &req); void* buf = malloc(tlen); if (buf == NULL) { goto _return; } - void* abuf = buf; - tSerializeMCreateTopicReq(&abuf, &req); + tSerializeMCreateTopicReq(buf, tlen, &req); /*printf("formatted: %s\n", dagStr);*/ pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 611416499f8d5f07f4790c1f28e772015c9054d6..e37aa72fe26b6f5240b1471ae18e8a709589ca23 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1858,7 +1858,7 @@ int32_t tDeserializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq return 0; } -int32_t tSerializeSMDropTopicReqq(void *buf, int32_t bufLen, SMDropTopicReq *pReq) { +int32_t tSerializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pReq) { SCoder encoder = {0}; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); @@ -1885,23 +1885,67 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR return 0; } -int32_t tSerializeMCreateTopicReq(void **buf, const SMCreateTopicReq *pReq) { - int32_t tlen = 0; - tlen += taosEncodeFixedI8(buf, pReq->igExists); - tlen += taosEncodeString(buf, pReq->name); - tlen += taosEncodeString(buf, pReq->sql); - tlen += taosEncodeString(buf, pReq->physicalPlan); - tlen += taosEncodeString(buf, pReq->logicalPlan); +int32_t tSerializeMCreateTopicReq(void *buf, int32_t bufLen, const SMCreateTopicReq *pReq) { + int32_t sqlLen = 0; + int32_t physicalPlanLen = 0; + int32_t logicalPlanLen = 0; + if (pReq->sql != NULL) sqlLen = (int32_t)strlen(pReq->sql); + if (pReq->physicalPlan != NULL) physicalPlanLen = (int32_t)strlen(pReq->physicalPlan); + if (pReq->logicalPlan != NULL) logicalPlanLen = (int32_t)strlen(pReq->logicalPlan); + + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; + if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; + if (tEncodeI32(&encoder, sqlLen) < 0) return -1; + if (tEncodeI32(&encoder, physicalPlanLen) < 0) return -1; + if (tEncodeI32(&encoder, logicalPlanLen) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->physicalPlan) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->logicalPlan) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); return tlen; } -void *tDeserializeSMCreateTopicReq(void *buf, SMCreateTopicReq *pReq) { - buf = taosDecodeFixedI8(buf, &(pReq->igExists)); - buf = taosDecodeString(buf, &(pReq->name)); - buf = taosDecodeString(buf, &(pReq->sql)); - buf = taosDecodeString(buf, &(pReq->physicalPlan)); - buf = taosDecodeString(buf, &(pReq->logicalPlan)); - return buf; +int32_t tDeserializeSMCreateTopicReq(void *buf, int32_t bufLen, SMCreateTopicReq *pReq) { + int32_t sqlLen = 0; + int32_t physicalPlanLen = 0; + int32_t logicalPlanLen = 0; + + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->igExists) < 0) return -1; + if (tDecodeI32(&decoder, &sqlLen) < 0) return -1; + if (tDecodeI32(&decoder, &physicalPlanLen) < 0) return -1; + if (tDecodeI32(&decoder, &logicalPlanLen) < 0) return -1; + + pReq->sql = calloc(1, sqlLen + 1); + pReq->physicalPlan = calloc(1, physicalPlanLen + 1); + pReq->logicalPlan = calloc(1, logicalPlanLen + 1); + if (pReq->sql == NULL || pReq->physicalPlan == NULL || pReq->logicalPlan == NULL) return -1; + + if (tDecodeCStrTo(&decoder, pReq->sql) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->physicalPlan) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->logicalPlan) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +void tFreeSMCreateTopicReq(SMCreateTopicReq *pReq) { + tfree(pReq->sql); + tfree(pReq->physicalPlan); + tfree(pReq->logicalPlan); } int32_t tSerializeSMCreateTopicRsp(void *buf, int32_t bufLen, const SMCreateTopicRsp *pRsp) { diff --git a/source/dnode/mgmt/impl/test/sut/src/sut.cpp b/source/dnode/mgmt/impl/test/sut/src/sut.cpp index 806d2a5f344a56a124a3488e0ba7e3768cd778d4..09a738be3b8a5ddabe2aa2ba1080db749b9ed7dd 100644 --- a/source/dnode/mgmt/impl/test/sut/src/sut.cpp +++ b/source/dnode/mgmt/impl/test/sut/src/sut.cpp @@ -99,6 +99,8 @@ void Testbase::SendShowMetaReq(int8_t showType, const char* db) { SRpcMsg* pRsp = SendReq(TDMT_MND_SHOW, pReq, contLen); ASSERT(pRsp->pCont != nullptr); + if (pRsp->contLen == 0) return; + SShowRsp showRsp = {0}; tDeserializeSShowRsp(pRsp->pCont, pRsp->contLen, &showRsp); tFreeSTableMetaRsp(&metaRsp); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 31d85d2b7265b96c04621681df714330561ca3e2..53ffd8698fd4bdf350cb9b437bfc726e19367f4f 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1387,8 +1387,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbVersions, int } static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) { - SSdb *pSdb = pMnode->pSdb; - + SSdb *pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; @@ -1402,7 +1401,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb); if (pIter == NULL) break; - if (strcmp(pStb->db, dbName) == 0) { + if (pStb->dbUid == pDb->uid) { numOfStbs++; } @@ -1410,6 +1409,7 @@ static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs } *pNumOfStbs = numOfStbs; + mndReleaseDb(pMnode, pDb); return 0; } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 040b7c58307ac3155104c5bf76ddef594afd5e7f..e7308b833e9c27ca568bfc4b97e5480dafc8f0e0 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "mndTopic.h" +#include "mndAuth.h" #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" @@ -52,6 +53,10 @@ int32_t mndInitTopic(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp); + mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_TP, mndGetTopicMeta); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TP, mndRetrieveTopic); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TP, mndCancelGetNextTopic); + return sdbSetTable(pMnode->pSdb, table); } @@ -78,7 +83,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER); SDB_SET_INT32(pRaw, dataPos, logicalPlanLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER); - SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER); @@ -187,7 +191,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic SMqTopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) { SSdb *pSdb = pMnode->pSdb; SMqTopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName); - if (pTopic == NULL) { + if (pTopic == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; } return pTopic; @@ -225,8 +229,11 @@ static SDDropTopicReq *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgroup, SMq return pDrop; } -static int32_t mndCheckCreateTopicMsg(SMCreateTopicReq *creattopReq) { - // deserialize and other stuff +static int32_t mndCheckCreateTopicReq(SMCreateTopicReq *pCreate) { + if (pCreate->name[0] == 0 || pCreate->sql == NULL || pCreate->sql[0] == 0) { + terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION; + return -1; + } return 0; } @@ -245,69 +252,122 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pReq, SMCreateTopicReq topicObj.logicalPlan = pCreate->logicalPlan; topicObj.sqlLen = strlen(pCreate->sql); - SSdbRaw *pTopicRaw = mndTopicActionEncode(&topicObj); - if (pTopicRaw == NULL) return -1; - if (sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY) != 0) return -1; - /*STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pReq->rpcMsg);*/ - /*mndTransAppendRedolog(pTrans, pTopicRaw);*/ - /*if (mndTransPrepare(pMnode, pTrans) != 0) {*/ - /*mError("mq-createTopic-trans:%d, failed to prepare since %s", pTrans->id, terrstr());*/ - /*mndTransDrop(pTrans);*/ - /*return -1;*/ - /*}*/ - /*mndTransDrop(pTrans);*/ - /*return 0;*/ - return sdbWrite(pMnode->pSdb, pTopicRaw); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + if (pTrans == NULL) { + mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); + return -1; + } + mDebug("trans:%d, used to create topic:%s", pTrans->id, pCreate->name); + + SSdbRaw *pRedoRaw = mndTopicActionEncode(&topicObj); + if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { + mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY); + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + mndTransDrop(pTrans); + return 0; } static int32_t mndProcessCreateTopicReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - char *msgStr = pReq->rpcMsg.pCont; - + SMnode *pMnode = pReq->pMnode; + int32_t code = -1; + SMqTopicObj *pTopic = NULL; + SDbObj *pDb = NULL; + SUserObj *pUser = NULL; SMCreateTopicReq createTopicReq = {0}; - tDeserializeSMCreateTopicReq(msgStr, &createTopicReq); + + if (tDeserializeSMCreateTopicReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createTopicReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto CREATE_TOPIC_OVER; + } mDebug("topic:%s, start to create, sql:%s", createTopicReq.name, createTopicReq.sql); - if (mndCheckCreateTopicMsg(&createTopicReq) != 0) { + if (mndCheckCreateTopicReq(&createTopicReq) != 0) { mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr()); - return -1; + goto CREATE_TOPIC_OVER; } - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, createTopicReq.name); + pTopic = mndAcquireTopic(pMnode, createTopicReq.name); if (pTopic != NULL) { - sdbRelease(pMnode->pSdb, pTopic); if (createTopicReq.igExists) { mDebug("topic:%s, already exist, ignore exist is set", createTopicReq.name); - return 0; + code = 0; + goto CREATE_TOPIC_OVER; } else { terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST; - mError("topic:%s, failed to create since already exists", createTopicReq.name); - return -1; + goto CREATE_TOPIC_OVER; } + } else if (terrno != TSDB_CODE_MND_TOPIC_NOT_EXIST) { + goto CREATE_TOPIC_OVER; } - SDbObj *pDb = mndAcquireDbByTopic(pMnode, createTopicReq.name); + pDb = mndAcquireDbByTopic(pMnode, createTopicReq.name); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + goto CREATE_TOPIC_OVER; + } + + pUser = mndAcquireUser(pMnode, pReq->user); + if (pUser == NULL) { + goto CREATE_TOPIC_OVER; + } + + if (mndCheckWriteAuth(pUser, pDb) != 0) { + goto CREATE_TOPIC_OVER; + } + + code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb); + if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; + +CREATE_TOPIC_OVER: + if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr()); - return -1; } - int32_t code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb); + mndReleaseTopic(pMnode, pTopic); mndReleaseDb(pMnode, pDb); + mndReleaseUser(pMnode, pUser); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr()); + tFreeSMCreateTopicReq(&createTopicReq); + return code; +} + +static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) { + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pReq->rpcMsg); + if (pTrans == NULL) { + mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); + return -1; + } + mDebug("trans:%d, used to drop topic:%s", pTrans->id, pTopic->name); + + SSdbRaw *pRedoRaw = mndTopicActionEncode(pTopic); + if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { + mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED); + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); return -1; } + mndTransDrop(pTrans); return 0; } -static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pReq, SMqTopicObj *pTopic) { return 0; } - static int32_t mndProcessDropTopicReq(SMnodeMsg *pReq) { SMnode *pMnode = pReq->pMnode; SMDropTopicReq dropReq = {0}; @@ -419,8 +479,7 @@ static int32_t mndProcessTopicMetaReq(SMnodeMsg *pReq) { } static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) { - SSdb *pSdb = pMnode->pSdb; - + SSdb *pSdb = pMnode->pSdb; SDbObj *pDb = mndAcquireDb(pMnode, dbName); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; @@ -434,12 +493,15 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); if (pIter == NULL) break; - numOfTopics++; + if (pTopic->dbUid == pDb->uid) { + numOfTopics++; + } sdbRelease(pSdb, pTopic); } *pNumOfTopics = numOfTopics; + mndReleaseDb(pMnode, pDb); return 0; } @@ -466,6 +528,12 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp * pSchema[cols].bytes = pShow->bytes[cols]; cols++; + pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "sql"); + pSchema[cols].bytes = pShow->bytes[cols]; + cols++; + pMeta->numOfColumns = cols; pShow->numOfColumns = cols; @@ -522,6 +590,10 @@ static int32_t mndRetrieveTopic(SMnodeMsg *pReq, SShowObj *pShow, char *data, in *(int64_t *)pWrite = pTopic->createTime; cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pTopic->sql, pShow->bytes[cols]); + cols++; + numOfRows++; sdbRelease(pSdb, pTopic); } diff --git a/source/dnode/mnode/impl/test/CMakeLists.txt b/source/dnode/mnode/impl/test/CMakeLists.txt index 5f6f2f3b4ff2f5eeed081a7839f3846642264cce..df0510f78369fde33b45cb458337d7fe8ac33a06 100644 --- a/source/dnode/mnode/impl/test/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/CMakeLists.txt @@ -13,3 +13,4 @@ add_subdirectory(mnode) add_subdirectory(db) add_subdirectory(stb) add_subdirectory(func) +add_subdirectory(topic) diff --git a/source/dnode/mnode/impl/test/topic/CMakeLists.txt b/source/dnode/mnode/impl/test/topic/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..63a77713d6825a709bc58c4ad5ac41f68724111b --- /dev/null +++ b/source/dnode/mnode/impl/test/topic/CMakeLists.txt @@ -0,0 +1,11 @@ +aux_source_directory(. TOPIC_SRC) +add_executable(mnode_test_topic ${TOPIC_SRC}) +target_link_libraries( + mnode_test_topic + PUBLIC sut +) + +add_test( + NAME mnode_test_topic + COMMAND mnode_test_topic +) diff --git a/source/dnode/mnode/impl/test/topic/topic.cpp b/source/dnode/mnode/impl/test/topic/topic.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8a4e17d054b271ed17473cb0343830ffb34777be --- /dev/null +++ b/source/dnode/mnode/impl/test/topic/topic.cpp @@ -0,0 +1,177 @@ +/** + * @file topic.cpp + * @author slguan (slguan@taosdata.com) + * @brief MNODE module topic tests + * @version 1.0 + * @date 2022-02-16 + * + * @copyright Copyright (c) 2022 + * + */ + +#include "sut.h" + +class MndTestTopic : public ::testing::Test { + protected: + static void SetUpTestSuite() { test.Init("/tmp/mnode_test_topic", 9039); } + static void TearDownTestSuite() { test.Cleanup(); } + + static Testbase test; + + public: + void SetUp() override {} + void TearDown() override {} + + void* BuildCreateDbReq(const char* dbname, int32_t* pContLen); + void* BuildCreateTopicReq(const char* topicName, const char* sql, int32_t* pContLen); + void* BuildDropTopicReq(const char* topicName, int32_t* pContLen); +}; + +Testbase MndTestTopic::test; + +void* MndTestTopic::BuildCreateDbReq(const char* dbname, int32_t* pContLen) { + SCreateDbReq createReq = {0}; + strcpy(createReq.db, dbname); + createReq.numOfVgroups = 2; + createReq.cacheBlockSize = 16; + createReq.totalBlocks = 10; + createReq.daysPerFile = 10; + createReq.daysToKeep0 = 3650; + createReq.daysToKeep1 = 3650; + createReq.daysToKeep2 = 3650; + createReq.minRows = 100; + createReq.maxRows = 4096; + createReq.commitTime = 3600; + createReq.fsyncPeriod = 3000; + createReq.walLevel = 1; + createReq.precision = 0; + createReq.compression = 2; + createReq.replications = 1; + createReq.quorum = 1; + createReq.update = 0; + createReq.cacheLastRow = 0; + createReq.ignoreExist = 1; + + int32_t contLen = tSerializeSCreateDbReq(NULL, 0, &createReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSCreateDbReq(pReq, contLen, &createReq); + + *pContLen = contLen; + return pReq; +} + +void* MndTestTopic::BuildCreateTopicReq(const char* topicName, const char* sql, int32_t* pContLen) { + SMCreateTopicReq createReq = {0}; + strcpy(createReq.name, topicName); + createReq.igExists = 0; + createReq.sql = (char*)sql; + createReq.physicalPlan = (char*)"physicalPlan"; + createReq.logicalPlan = (char*)"logicalPlan"; + + int32_t contLen = tSerializeMCreateTopicReq(NULL, 0, &createReq); + void* pReq = rpcMallocCont(contLen); + tSerializeMCreateTopicReq(pReq, contLen, &createReq); + + *pContLen = contLen; + return pReq; +} + +void* MndTestTopic::BuildDropTopicReq(const char* topicName, int32_t* pContLen) { + SMDropTopicReq dropReq = {0}; + strcpy(dropReq.name, topicName); + + int32_t contLen = tSerializeSMDropTopicReq(NULL, 0, &dropReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSMDropTopicReq(pReq, contLen, &dropReq); + + *pContLen = contLen; + return pReq; +} + +TEST_F(MndTestTopic, 01_Create_Topic) { + const char* dbname = "1.d1"; + const char* topicName = "1.d1.t1"; + + { + int32_t contLen = 0; + void* pReq = BuildCreateDbReq(dbname, &contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DB, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, 0); + } + + { + test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, ""); + } + + { + int32_t contLen = 0; + void* pReq = BuildCreateTopicReq("t1", "sql", &contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_TOPIC, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_MND_DB_NOT_SELECTED); + } + + { + int32_t contLen = 0; + void* pReq = BuildCreateTopicReq(topicName, "sql", &contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_TOPIC, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, 0); + } + + { + int32_t contLen = 0; + void* pReq = BuildCreateTopicReq(topicName, "sql", &contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_TOPIC, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_MND_TOPIC_ALREADY_EXIST); + } + + { + test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, dbname); + CHECK_META("show topics", 3); + + CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE, "name"); + CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time"); + CHECK_SCHEMA(2, TSDB_DATA_TYPE_BINARY, TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, "sql"); + + test.SendShowRetrieveReq(); + EXPECT_EQ(test.GetShowRows(), 1); + + CheckBinary("t1", TSDB_TABLE_NAME_LEN); + CheckTimestamp(); + CheckBinary("sql", TSDB_SHOW_SQL_LEN); + + // restart + test.Restart(); + + test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, dbname); + test.SendShowRetrieveReq(); + EXPECT_EQ(test.GetShowRows(), 1); + + CheckBinary("t1", TSDB_TABLE_NAME_LEN); + CheckTimestamp(); + CheckBinary("sql", TSDB_SHOW_SQL_LEN); + } + + { + int32_t contLen = 0; + void* pReq = BuildDropTopicReq(topicName, &contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_TOPIC, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, 0); + } + + { + int32_t contLen = 0; + void* pReq = BuildDropTopicReq(topicName, &contLen); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_TOPIC, pReq, contLen); + ASSERT_NE(pRsp, nullptr); + ASSERT_EQ(pRsp->code, TSDB_CODE_MND_TOPIC_NOT_EXIST); + + test.SendShowMetaReq(TSDB_MGMT_TABLE_TP, dbname); + test.SendShowRetrieveReq(); + EXPECT_EQ(test.GetShowRows(), 0); + } +}