From 7750309283fc84f87ae89f6afcfe5ece27bd9ef0 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 19 Dec 2021 11:08:43 +0800 Subject: [PATCH] TD-10431 process create vnode msg --- include/util/tdef.h | 2 +- source/dnode/mgmt/impl/src/dndVnodes.c | 88 ++++--- source/dnode/mgmt/impl/test/CMakeLists.txt | 2 +- source/dnode/mgmt/impl/test/db/db.cpp | 1 + .../mgmt/impl/test/vgroup/CMakeLists.txt | 27 +++ source/dnode/mgmt/impl/test/vgroup/vgroup.cpp | 224 ++++++++++++++++++ 6 files changed, 306 insertions(+), 38 deletions(-) create mode 100644 source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt create mode 100644 source/dnode/mgmt/impl/test/vgroup/vgroup.cpp diff --git a/include/util/tdef.h b/include/util/tdef.h index 02b5a1e620..f3f3643268 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -231,7 +231,7 @@ do { \ #define TSDB_DEFAULT_PAYLOAD_SIZE 5120 // default payload size, greater than PATH_MAX value #define TSDB_EXTRA_PAYLOAD_SIZE 128 // extra bytes for auth #define TSDB_CQ_SQL_SIZE 1024 -#define TSDB_MIN_VNODES 64 +#define TSDB_MIN_VNODES 16 #define TSDB_MAX_VNODES 512 #define TSDB_MIN_VNODES_PER_DB 1 #define TSDB_MAX_VNODES_PER_DB 4096 diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index a6eb916aef..d3f1b06a4a 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -95,7 +95,7 @@ static int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SVnodeObj * pVnode = NULL; + SVnodeObj *pVnode = NULL; int32_t refCount = 0; taosRLockLatch(&pMgmt->latch); @@ -107,23 +107,23 @@ static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) { } taosRUnLockLatch(&pMgmt->latch); - dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); + if (pVnode != NULL) { + dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); + } + return pVnode; } static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) { + if (pVnode == NULL) return; + SVnodesMgmt *pMgmt = &pDnode->vmgmt; - int32_t refCount = 0; taosRLockLatch(&pMgmt->latch); - if (pVnode != NULL) { - refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); - } + int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); taosRUnLockLatch(&pMgmt->latch); - if (pVnode != NULL) { - dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); - } + dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); } static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl) { @@ -457,7 +457,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { pMgmt->totalVnodes = numOfVnodes; - int32_t threadNum = tsNumOfCores; + int32_t threadNum = pDnode->opt.numOfCores; int32_t vnodesPerThread = numOfVnodes / threadNum + 1; SVnodeThread *threads = calloc(threadNum, sizeof(SVnodeThread)); @@ -525,33 +525,49 @@ static void dndCloseVnodes(SDnode *pDnode) { static int32_t dndParseCreateVnodeReq(SRpcMsg *rpcMsg, int32_t *vgId, SVnodeCfg *pCfg) { SCreateVnodeMsg *pCreate = rpcMsg->pCont; - *vgId = htonl(pCreate->vgId); + pCreate->vgId = htonl(pCreate->vgId); + pCreate->dnodeId = htonl(pCreate->dnodeId); + pCreate->dbUid = htobe64(pCreate->dbUid); + pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); + pCreate->totalBlocks = htonl(pCreate->totalBlocks); + pCreate->daysPerFile = htonl(pCreate->daysPerFile); + pCreate->daysToKeep0 = htonl(pCreate->daysToKeep0); + pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1); + pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2); + pCreate->minRows = htonl(pCreate->minRows); + pCreate->maxRows = htonl(pCreate->maxRows); + pCreate->commitTime = htonl(pCreate->commitTime); + pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod); + for (int r = 0; r < pCreate->replica; ++r) { + SReplica *pReplica = &pCreate->replicas[r]; + pReplica->id = htonl(pReplica->id); + pReplica->port = htons(pReplica->port); + } + + *vgId = pCreate->vgId; #if 0 - tstrncpy(pCfg->db, pCreate->db, TSDB_FULL_DB_NAME_LEN); - pCfg->cacheBlockSize = htonl(pCreate->cacheBlockSize); - pCfg->totalBlocks = htonl(pCreate->totalBlocks); - pCfg->daysPerFile = htonl(pCreate->daysPerFile); - pCfg->daysToKeep0 = htonl(pCreate->daysToKeep0); - pCfg->daysToKeep1 = htonl(pCreate->daysToKeep1); - pCfg->daysToKeep2 = htonl(pCreate->daysToKeep2); - pCfg->minRowsPerFileBlock = htonl(pCreate->minRowsPerFileBlock); - pCfg->maxRowsPerFileBlock = htonl(pCreate->maxRowsPerFileBlock); - pCfg->precision = pCreate->precision; - pCfg->compression = pCreate->compression; - pCfg->cacheLastRow = pCreate->cacheLastRow; - pCfg->update = pCreate->update; - pCfg->quorum = pCreate->quorum; - pCfg->replica = pCreate->replica; - pCfg->walLevel = pCreate->walLevel; - pCfg->fsyncPeriod = htonl(pCreate->fsyncPeriod); - - for (int32_t i = 0; i < pCfg->replica; ++i) { - pCfg->replicas[i].port = htons(pCreate->replicas[i].port); - tstrncpy(pCfg->replicas[i].fqdn, pCreate->replicas[i].fqdn, TSDB_FQDN_LEN); - } + pCfg->wsize = pCreate->cacheBlockSize; + pCfg->ssize = pCreate->cacheBlockSize; + pCfg->wsize = pCreate->cacheBlockSize; + pCfg->lsize = pCreate->cacheBlockSize; + pCfg->isHeapAllocator = true; + pCfg->ttl = 4; + pCfg->keep = pCreate->daysToKeep0; + pCfg->isWeak = true; + pCfg->tsdbCfg.keep0 = pCreate->daysToKeep0; + pCfg->tsdbCfg.keep1 = pCreate->daysToKeep2; + pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0; + pCfg->tsdbCfg.lruCacheSize = pCreate->cacheBlockSize; + pCfg->metaCfg.lruSize = pCreate->cacheBlockSize; + pCfg->walCfg.fsyncPeriod = pCreate->fsyncPeriod; + pCfg->walCfg.level = pCreate->walLevel; + pCfg->walCfg.retentionPeriod = 10; + pCfg->walCfg.retentionSize = 128; + pCfg->walCfg.rollPeriod = 128; + pCfg->walCfg.segSize = 128; + pCfg->walCfg.vgId = pCreate->vgId; #endif - return 0; } @@ -1016,7 +1032,7 @@ static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) { SVnodesMgmt * pMgmt = &pDnode->vmgmt; SMWorkerPool *pPool = &pMgmt->writePool; pPool->name = "vnode-write"; - pPool->max = tsNumOfCores; + pPool->max = pDnode->opt.numOfCores; if (tMWorkerInit(pPool) != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -1050,7 +1066,7 @@ static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { } static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) { - int32_t maxThreads = tsNumOfCores / 2; + int32_t maxThreads = pDnode->opt.numOfCores / 2; if (maxThreads < 1) maxThreads = 1; SVnodesMgmt *pMgmt = &pDnode->vmgmt; diff --git a/source/dnode/mgmt/impl/test/CMakeLists.txt b/source/dnode/mgmt/impl/test/CMakeLists.txt index 8c6d146fb6..a5ece72f42 100644 --- a/source/dnode/mgmt/impl/test/CMakeLists.txt +++ b/source/dnode/mgmt/impl/test/CMakeLists.txt @@ -15,6 +15,6 @@ add_subdirectory(stb) # add_subdirectory(telem) # add_subdirectory(trans) add_subdirectory(user) -# add_subdirectory(vgroup) +add_subdirectory(vgroup) # add_subdirectory(common) diff --git a/source/dnode/mgmt/impl/test/db/db.cpp b/source/dnode/mgmt/impl/test/db/db.cpp index de1a606c86..d465a62f2d 100644 --- a/source/dnode/mgmt/impl/test/db/db.cpp +++ b/source/dnode/mgmt/impl/test/db/db.cpp @@ -232,6 +232,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { SRpcMsg* pMsg = pClient->pRsp; ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); + // taosMsleep(1000000); } SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL); diff --git a/source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt b/source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt new file mode 100644 index 0000000000..5670f9dbf2 --- /dev/null +++ b/source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt @@ -0,0 +1,27 @@ +add_executable(dnode_test_vgroup "") + +target_sources(dnode_test_vgroup + PRIVATE + "vgroup.cpp" + "../sut/deploy.cpp" +) + +target_link_libraries( + dnode_test_vgroup + PUBLIC dnode + PUBLIC util + PUBLIC os + PUBLIC gtest_main +) + +target_include_directories(dnode_test_vgroup + PUBLIC + "${CMAKE_SOURCE_DIR}/include/server/dnode/mgmt" + "${CMAKE_CURRENT_SOURCE_DIR}/../../inc" + "${CMAKE_CURRENT_SOURCE_DIR}/../sut" +) + +add_test( + NAME dnode_test_vgroup + COMMAND dnode_test_vgroup +) diff --git a/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp b/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp new file mode 100644 index 0000000000..3f16cd87d8 --- /dev/null +++ b/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp @@ -0,0 +1,224 @@ +/** + * @file db.cpp + * @author slguan (slguan@taosdata.com) + * @brief DNODE module vgroup-msg tests + * @version 0.1 + * @date 2021-12-20 + * + * @copyright Copyright (c) 2021 + * + */ + +#include "deploy.h" + +class DndTestVgroup : public ::testing::Test { + protected: + static SServer* CreateServer(const char* path, const char* fqdn, uint16_t port, const char* firstEp) { + SServer* pServer = createServer(path, fqdn, port, firstEp); + ASSERT(pServer); + return pServer; + } + + static void SetUpTestSuite() { + initLog("/tmp/tdlog"); + + const char* fqdn = "localhost"; + const char* firstEp = "localhost:9150"; + pServer = CreateServer("/tmp/dnode_test_vgroup", fqdn, 9150, firstEp); + pClient = createClient("root", "taosdata", fqdn, 9150); + taosMsleep(1100); + } + + static void TearDownTestSuite() { + stopServer(pServer); + dropClient(pClient); + pServer = NULL; + pClient = NULL; + } + + static SServer* pServer; + static SClient* pClient; + static int32_t connId; + + public: + void SetUp() override {} + void TearDown() override {} + + void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns, const char* db) { + SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); + pShow->type = showType; + if (db != NULL) { + strcpy(pShow->db, db); + } + SRpcMsg showRpcMsg = {0}; + showRpcMsg.pCont = pShow; + showRpcMsg.contLen = sizeof(SShowMsg); + showRpcMsg.msgType = TSDB_MSG_TYPE_SHOW; + + sendMsg(pClient, &showRpcMsg); + ASSERT_NE(pClient->pRsp, nullptr); + ASSERT_EQ(pClient->pRsp->code, 0); + ASSERT_NE(pClient->pRsp->pCont, nullptr); + + SShowRsp* pShowRsp = (SShowRsp*)pClient->pRsp->pCont; + ASSERT_NE(pShowRsp, nullptr); + pShowRsp->showId = htonl(pShowRsp->showId); + pMeta = &pShowRsp->tableMeta; + pMeta->numOfTags = htonl(pMeta->numOfTags); + pMeta->numOfColumns = htonl(pMeta->numOfColumns); + pMeta->sversion = htonl(pMeta->sversion); + pMeta->tversion = htonl(pMeta->tversion); + pMeta->tuid = htobe64(pMeta->tuid); + pMeta->suid = htobe64(pMeta->suid); + + showId = pShowRsp->showId; + + EXPECT_NE(pShowRsp->showId, 0); + EXPECT_STREQ(pMeta->tbFname, showName); + EXPECT_EQ(pMeta->numOfTags, 0); + EXPECT_EQ(pMeta->numOfColumns, columns); + EXPECT_EQ(pMeta->precision, 0); + EXPECT_EQ(pMeta->tableType, 0); + EXPECT_EQ(pMeta->update, 0); + EXPECT_EQ(pMeta->sversion, 0); + EXPECT_EQ(pMeta->tversion, 0); + EXPECT_EQ(pMeta->tuid, 0); + EXPECT_EQ(pMeta->suid, 0); + } + + void CheckSchema(int32_t index, int8_t type, int32_t bytes, const char* name) { + SSchema* pSchema = &pMeta->pSchema[index]; + pSchema->bytes = htonl(pSchema->bytes); + EXPECT_EQ(pSchema->colId, 0); + EXPECT_EQ(pSchema->type, type); + EXPECT_EQ(pSchema->bytes, bytes); + EXPECT_STREQ(pSchema->name, name); + } + + void SendThenCheckShowRetrieveMsg(int32_t rows) { + SRetrieveTableMsg* pRetrieve = (SRetrieveTableMsg*)rpcMallocCont(sizeof(SRetrieveTableMsg)); + pRetrieve->showId = htonl(showId); + pRetrieve->free = 0; + + SRpcMsg retrieveRpcMsg = {0}; + retrieveRpcMsg.pCont = pRetrieve; + retrieveRpcMsg.contLen = sizeof(SRetrieveTableMsg); + retrieveRpcMsg.msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE; + + sendMsg(pClient, &retrieveRpcMsg); + + ASSERT_NE(pClient->pRsp, nullptr); + ASSERT_EQ(pClient->pRsp->code, 0); + ASSERT_NE(pClient->pRsp->pCont, nullptr); + + pRetrieveRsp = (SRetrieveTableRsp*)pClient->pRsp->pCont; + ASSERT_NE(pRetrieveRsp, nullptr); + pRetrieveRsp->numOfRows = htonl(pRetrieveRsp->numOfRows); + pRetrieveRsp->useconds = htobe64(pRetrieveRsp->useconds); + pRetrieveRsp->compLen = htonl(pRetrieveRsp->compLen); + + EXPECT_EQ(pRetrieveRsp->numOfRows, rows); + EXPECT_EQ(pRetrieveRsp->useconds, 0); + // EXPECT_EQ(pRetrieveRsp->completed, completed); + EXPECT_EQ(pRetrieveRsp->precision, TSDB_TIME_PRECISION_MILLI); + EXPECT_EQ(pRetrieveRsp->compressed, 0); + EXPECT_EQ(pRetrieveRsp->compLen, 0); + + pData = pRetrieveRsp->data; + pos = 0; + } + + void CheckInt8(int8_t val) { + int8_t data = *((int8_t*)(pData + pos)); + pos += sizeof(int8_t); + EXPECT_EQ(data, val); + } + + void CheckInt16(int16_t val) { + int16_t data = *((int16_t*)(pData + pos)); + pos += sizeof(int16_t); + EXPECT_EQ(data, val); + } + + void CheckInt32(int32_t val) { + int32_t data = *((int32_t*)(pData + pos)); + pos += sizeof(int32_t); + EXPECT_EQ(data, val); + } + + void CheckInt64(int64_t val) { + int64_t data = *((int64_t*)(pData + pos)); + pos += sizeof(int64_t); + EXPECT_EQ(data, val); + } + + void CheckTimestamp() { + int64_t data = *((int64_t*)(pData + pos)); + pos += sizeof(int64_t); + EXPECT_GT(data, 0); + } + + void CheckBinary(const char* val, int32_t len) { + pos += sizeof(VarDataLenT); + char* data = (char*)(pData + pos); + pos += len; + EXPECT_STREQ(data, val); + } + + int32_t showId; + STableMetaMsg* pMeta; + SRetrieveTableRsp* pRetrieveRsp; + char* pData; + int32_t pos; +}; + +SServer* DndTestVgroup::pServer; +SClient* DndTestVgroup::pClient; +int32_t DndTestVgroup::connId; + + +TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) { + { + SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(sizeof(SCreateVnodeMsg)); + pReq->vgId = htonl(2); + pReq->dnodeId = htonl(1); + strcpy(pReq->db, "1.d1"); + pReq->dbUid = htobe64(9527); + pReq->cacheBlockSize = htonl(16); + pReq->totalBlocks = htonl(10); + pReq->daysPerFile = htonl(10); + pReq->daysToKeep0 = htonl(3650); + pReq->daysToKeep1 = htonl(3650); + pReq->daysToKeep2 = htonl(3650); + pReq->minRows = htonl(100); + pReq->minRows = htonl(4096); + pReq->commitTime = htonl(3600); + pReq->fsyncPeriod = htonl(3000); + pReq->walLevel = 1; + pReq->precision = 0; + pReq->compression = 2; + pReq->replica = 1; + pReq->quorum = 1; + pReq->update = 0; + pReq->cacheLastRow = 0; + pReq->selfIndex = 0; + for (int r = 0; r < pReq->replica; ++r) { + SReplica* pReplica = &pReq->replicas[r]; + pReplica->id = htonl(1); + pReplica->port = htons(9150); + } + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SCreateVnodeMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_VNODE_IN; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + taosMsleep(1000000); + } + +} + -- GitLab