From ec1c569fb96fc1c8d063093006c00497f083cfd1 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 16 Dec 2021 17:05:13 +0800 Subject: [PATCH] TD-10431 process use db msg --- include/common/taosmsg.h | 197 +++++++++----------- source/dnode/mgmt/impl/test/acct/acct.cpp | 18 +- source/dnode/mgmt/impl/test/db/db.cpp | 165 ++++++++++++---- source/dnode/mgmt/impl/test/dnode/dnode.cpp | 18 +- source/dnode/mgmt/impl/test/user/user.cpp | 18 +- source/dnode/mnode/impl/inc/mndDef.h | 3 +- source/dnode/mnode/impl/src/mndDb.c | 80 +++++++- source/dnode/mnode/impl/src/mndVgroup.c | 15 +- 8 files changed, 325 insertions(+), 189 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index e706cc9bad..b8ad59afee 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -215,8 +215,8 @@ typedef enum _mgmt_table { extern char *taosMsg[]; typedef struct SBuildTableMetaInput { - int32_t vgId; - char *tableFullName; + int32_t vgId; + char *tableFullName; } SBuildTableMetaInput; typedef struct SBuildUseDBInput { @@ -224,7 +224,6 @@ typedef struct SBuildUseDBInput { int32_t vgVersion; } SBuildUseDBInput; - #pragma pack(push, 1) // null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta @@ -234,7 +233,7 @@ typedef struct { } SEpAddrMsg; typedef struct { - char* fqdn; + char *fqdn; uint16_t port; } SEpAddr1; @@ -261,10 +260,10 @@ typedef struct SSubmitBlk { // Submit message for this TSDB typedef struct SSubmitMsg { - SMsgHead header; - int32_t length; - int32_t numOfBlocks; - char blocks[]; + SMsgHead header; + int32_t length; + int32_t numOfBlocks; + char blocks[]; } SSubmitMsg; typedef struct { @@ -343,7 +342,7 @@ typedef struct { typedef struct { char tableFname[TSDB_TABLE_FNAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN]; - int16_t type; /* operation type */ + int16_t type; /* operation type */ int16_t numOfCols; /* number of schema */ int32_t tagValLen; SSchema schema[]; @@ -352,17 +351,17 @@ typedef struct { } SAlterTableMsg; typedef struct { - SMsgHead head; - int64_t uid; - int32_t tid; - int16_t tversion; - int16_t colId; - int8_t type; - int16_t bytes; - int32_t tagValLen; - int16_t numOfTags; - int32_t schemaLen; - char data[]; + SMsgHead head; + int64_t uid; + int32_t tid; + int16_t tversion; + int16_t colId; + int8_t type; + int16_t bytes; + int32_t tagValLen; + int16_t numOfTags; + int32_t schemaLen; + char data[]; } SUpdateTableTagValMsg; typedef struct { @@ -423,7 +422,7 @@ typedef struct { int32_t contLen; int32_t vgId; uint64_t uid; - char tableFname[TSDB_TABLE_FNAME_LEN]; + char tableFname[TSDB_TABLE_FNAME_LEN]; } SDropSTableMsg; typedef struct SColIndex { @@ -436,7 +435,7 @@ typedef struct SColIndex { typedef struct SColumnFilterInfo { int16_t lowerRelOptr; int16_t upperRelOptr; - int16_t filterstr; // denote if current column is char(binary/nchar) + int16_t filterstr; // denote if current column is char(binary/nchar) union { struct { @@ -455,9 +454,9 @@ typedef struct SColumnFilterInfo { } SColumnFilterInfo; typedef struct SColumnFilterList { - int16_t numOfFilters; - union{ - int64_t placeholder; + int16_t numOfFilters; + union { + int64_t placeholder; SColumnFilterInfo *filterInfo; }; } SColumnFilterList; @@ -466,10 +465,10 @@ typedef struct SColumnFilterList { * But for data in vnode side, we need all the following information. */ typedef struct SColumnInfo { - int16_t colId; - int16_t type; - int16_t bytes; - SColumnFilterList flist; + int16_t colId; + int16_t type; + int16_t bytes; + SColumnFilterList flist; } SColumnInfo; typedef struct STableIdInfo { @@ -483,14 +482,14 @@ typedef struct STimeWindow { } STimeWindow; typedef struct { - int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed - int32_t tsLen; // total length of ts comp block - int32_t tsNumOfBlocks; // ts comp block numbers - int32_t tsOrder; // ts comp block order + int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed + int32_t tsLen; // total length of ts comp block + int32_t tsNumOfBlocks; // ts comp block numbers + int32_t tsOrder; // ts comp block order } STsBufInfo; typedef struct SInterval { - int32_t tz; // query client timezone + int32_t tz; // query client timezone char intervalUnit; char slidingUnit; char offsetUnit; @@ -500,51 +499,51 @@ typedef struct SInterval { } SInterval; typedef struct { - SMsgHead head; - char version[TSDB_VERSION_LEN]; + SMsgHead head; + char version[TSDB_VERSION_LEN]; - bool stableQuery; // super table query or not - bool topBotQuery; // TODO used bitwise flag - bool interpQuery; // interp query or not - bool groupbyColumn; // denote if this is a groupby normal column query - bool hasTagResults; // if there are tag values in final result or not - bool timeWindowInterpo;// if the time window start/end required interpolation - bool queryBlockDist; // if query data block distribution - bool stabledev; // super table stddev query - bool tsCompQuery; // is tscomp query - bool simpleAgg; - bool pointInterpQuery; // point interpolation query - bool needReverseScan; // need reverse scan - bool stateWindow; // state window flag + bool stableQuery; // super table query or not + bool topBotQuery; // TODO used bitwise flag + bool interpQuery; // interp query or not + bool groupbyColumn; // denote if this is a groupby normal column query + bool hasTagResults; // if there are tag values in final result or not + bool timeWindowInterpo; // if the time window start/end required interpolation + bool queryBlockDist; // if query data block distribution + bool stabledev; // super table stddev query + bool tsCompQuery; // is tscomp query + bool simpleAgg; + bool pointInterpQuery; // point interpolation query + bool needReverseScan; // need reverse scan + bool stateWindow; // state window flag STimeWindow window; int32_t numOfTables; int16_t order; int16_t orderColId; - int16_t numOfCols; // the number of columns will be load from vnode + int16_t numOfCols; // the number of columns will be load from vnode SInterval interval; -// SSessionWindow sw; // session window - int16_t tagCondLen; // tag length in current query - int16_t colCondLen; // column length in current query - int16_t numOfGroupCols; // num of group by columns - int16_t orderByIdx; - int16_t orderType; // used in group by xx order by xxx - int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query. - int16_t prjOrder; // global order in super table projection query. - int64_t limit; - int64_t offset; - int32_t queryType; // denote another query process - int16_t numOfOutput; // final output columns numbers - int16_t fillType; // interpolate type - int64_t fillVal; // default value array list - int32_t secondStageOutput; - STsBufInfo tsBuf; // tsBuf info - int32_t numOfTags; // number of tags columns involved - int32_t sqlstrLen; // sql query string - int32_t prevResultLen; // previous result length + // SSessionWindow sw; // session window + int16_t tagCondLen; // tag length in current query + int16_t colCondLen; // column length in current query + int16_t numOfGroupCols; // num of group by columns + int16_t orderByIdx; + int16_t orderType; // used in group by xx order by xxx + int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query. + int16_t prjOrder; // global order in super table projection query. + int64_t limit; + int64_t offset; + int32_t queryType; // denote another query process + int16_t numOfOutput; // final output columns numbers + int16_t fillType; // interpolate type + int64_t fillVal; // default value array list + int32_t secondStageOutput; + STsBufInfo tsBuf; // tsBuf info + int32_t numOfTags; // number of tags columns involved + int32_t sqlstrLen; // sql query string + int32_t prevResultLen; // previous result length int32_t numOfOperator; - int32_t tableScanOperator;// table scan operator. -1 means no scan operator - int32_t udfNum; // number of udf function + int32_t tableScanOperator; // table scan operator. -1 means no scan operator + int32_t udfNum; // number of udf function int32_t udfContentOffset; int32_t udfContentLen; SColumnInfo tableCols[]; @@ -626,7 +625,6 @@ typedef struct { typedef struct { char db[TSDB_TABLE_FNAME_LEN]; - int8_t ignoreNotExists; int32_t vgVersion; int32_t reserve[8]; } SUseDbMsg; @@ -789,8 +787,8 @@ typedef struct { } SStbInfoMsg; typedef struct { - SMsgHead msgHead; - char tableFname[TSDB_TABLE_FNAME_LEN]; + SMsgHead msgHead; + char tableFname[TSDB_TABLE_FNAME_LEN]; } STableInfoMsg; typedef struct { @@ -832,19 +830,19 @@ typedef struct { } SVgroupsMsg, SVgroupsInfo; typedef struct { - char tbFname[TSDB_TABLE_FNAME_LEN]; // table id - char stbFname[TSDB_TABLE_FNAME_LEN]; - int32_t numOfTags; - int32_t numOfColumns; - int8_t precision; - int8_t tableType; - int8_t update; - int32_t sversion; - int32_t tversion; - uint64_t tuid; - uint64_t suid; - int32_t vgId; - SSchema pSchema[]; + char tbFname[TSDB_TABLE_FNAME_LEN]; // table id + char stbFname[TSDB_TABLE_FNAME_LEN]; + int32_t numOfTags; + int32_t numOfColumns; + int8_t precision; + int8_t tableType; + int8_t update; + int32_t sversion; + int32_t tversion; + uint64_t tuid; + uint64_t suid; + int32_t vgId; + SSchema pSchema[]; } STableMetaMsg; typedef struct SMultiTableMeta { @@ -867,13 +865,11 @@ typedef struct { typedef struct { char db[TSDB_FULL_DB_NAME_LEN]; int32_t vgVersion; - int32_t vgNum; + int32_t vgNum; int8_t hashMethod; SVgroupInfo vgroupInfo[]; } SUseDbRsp; - - /* * sql: show tables like '%a_%' * payload is the query condition, e.g., '%a_%' @@ -887,7 +883,7 @@ typedef struct { } SShowMsg; typedef struct { - char db[TSDB_FULL_DB_NAME_LEN]; + char db[TSDB_FULL_DB_NAME_LEN]; int32_t numOfVgroup; int32_t vgid[]; } SCompactMsg; @@ -1008,45 +1004,37 @@ typedef struct { } SAuthMsg, SAuthRsp; typedef struct { - int8_t finished; - int8_t reserved1[7]; - char name[TSDB_STEP_NAME_LEN]; - char desc[TSDB_STEP_DESC_LEN]; + int8_t finished; + int8_t reserved1[7]; + char name[TSDB_STEP_NAME_LEN]; + char desc[TSDB_STEP_DESC_LEN]; } SStartupMsg; // mq related typedef struct { - } SMqConnectReq; typedef struct { - } SMqConnectRsp; typedef struct { - } SMqDisconnectReq; typedef struct { - } SMqDisconnectRsp; typedef struct { - } SMqAckReq; typedef struct { - } SMqAckRsp; typedef struct { - } SMqResetReq; typedef struct { - } SMqResetRsp; -//mq related end +// mq related end typedef struct { /* data */ @@ -1100,7 +1088,6 @@ typedef struct { /* data */ } SUpdateTagValRsp; - #pragma pack(pop) #ifdef __cplusplus diff --git a/source/dnode/mgmt/impl/test/acct/acct.cpp b/source/dnode/mgmt/impl/test/acct/acct.cpp index 9cf4d5a46d..c548524216 100644 --- a/source/dnode/mgmt/impl/test/acct/acct.cpp +++ b/source/dnode/mgmt/impl/test/acct/acct.cpp @@ -1,16 +1,12 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. +/** + * @file vnodeApiTests.cpp + * @author slguan (slguan@taosdata.com) + * @brief DNODE module API tests + * @version 0.1 + * @date 2021-12-15 * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. + * @copyright Copyright (c) 2021 * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . */ #include "deploy.h" diff --git a/source/dnode/mgmt/impl/test/db/db.cpp b/source/dnode/mgmt/impl/test/db/db.cpp index 56ba893723..4722af67dc 100644 --- a/source/dnode/mgmt/impl/test/db/db.cpp +++ b/source/dnode/mgmt/impl/test/db/db.cpp @@ -1,16 +1,12 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. +/** + * @file vnodeApiTests.cpp + * @author slguan (slguan@taosdata.com) + * @brief DNODE module API tests + * @version 0.1 + * @date 2021-12-16 * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. + * @copyright Copyright (c) 2021 * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . */ #include "deploy.h" @@ -30,7 +26,7 @@ class DndTestDb : public ::testing::Test { const char* firstEp = "localhost:9040"; pServer = CreateServer("/tmp/dnode_test_db", fqdn, 9040, firstEp); pClient = createClient("root", "taosdata", fqdn, 9040); - taosMsleep(300); + taosMsleep(1100); } static void TearDownTestSuite() { @@ -184,31 +180,30 @@ SClient* DndTestDb::pClient; int32_t DndTestDb::connId; TEST_F(DndTestDb, 01_ShowDb) { - SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 16, NULL); + SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL); CheckSchema(0, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "name"); CheckSchema(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create time"); - CheckSchema(2, TSDB_DATA_TYPE_SMALLINT, 2, "replica"); - CheckSchema(3, TSDB_DATA_TYPE_SMALLINT, 2, "quorum"); - CheckSchema(4, TSDB_DATA_TYPE_SMALLINT, 2, "days"); - CheckSchema(5, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "keep0,keep1,keep2"); - CheckSchema(6, TSDB_DATA_TYPE_INT, 4, "cache(MB)"); - CheckSchema(7, TSDB_DATA_TYPE_INT, 4, "blocks"); - CheckSchema(8, TSDB_DATA_TYPE_INT, 4, "minrows"); - CheckSchema(9, TSDB_DATA_TYPE_INT, 4, "maxrows"); - CheckSchema(10, TSDB_DATA_TYPE_TINYINT, 1, "wallevel"); - CheckSchema(11, TSDB_DATA_TYPE_INT, 4, "fsync"); - CheckSchema(12, TSDB_DATA_TYPE_TINYINT, 1, "comp"); - CheckSchema(13, TSDB_DATA_TYPE_TINYINT, 1, "cachelast"); - CheckSchema(14, TSDB_DATA_TYPE_BINARY, 3 + VARSTR_HEADER_SIZE, "precision"); - CheckSchema(15, TSDB_DATA_TYPE_TINYINT, 1, "update"); + CheckSchema(2, TSDB_DATA_TYPE_SMALLINT, 2, "vgroups"); + CheckSchema(3, TSDB_DATA_TYPE_SMALLINT, 2, "replica"); + CheckSchema(4, TSDB_DATA_TYPE_SMALLINT, 2, "quorum"); + CheckSchema(5, TSDB_DATA_TYPE_SMALLINT, 2, "days"); + CheckSchema(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "keep0,keep1,keep2"); + CheckSchema(7, TSDB_DATA_TYPE_INT, 4, "cache(MB)"); + CheckSchema(8, TSDB_DATA_TYPE_INT, 4, "blocks"); + CheckSchema(9, TSDB_DATA_TYPE_INT, 4, "minrows"); + CheckSchema(10, TSDB_DATA_TYPE_INT, 4, "maxrows"); + CheckSchema(11, TSDB_DATA_TYPE_TINYINT, 1, "wallevel"); + CheckSchema(12, TSDB_DATA_TYPE_INT, 4, "fsync"); + CheckSchema(13, TSDB_DATA_TYPE_TINYINT, 1, "comp"); + CheckSchema(14, TSDB_DATA_TYPE_TINYINT, 1, "cachelast"); + CheckSchema(15, TSDB_DATA_TYPE_BINARY, 3 + VARSTR_HEADER_SIZE, "precision"); + CheckSchema(16, TSDB_DATA_TYPE_TINYINT, 1, "update"); SendThenCheckShowRetrieveMsg(0); } TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { { - taosMsleep(1100); - SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg)); strcpy(pReq->db, "1.d1"); pReq->numOfVgroups = htonl(2); @@ -242,10 +237,11 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { ASSERT_EQ(pMsg->code, 0); } - SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 16, NULL); + SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL); SendThenCheckShowRetrieveMsg(1); CheckBinary("d1", TSDB_DB_NAME_LEN - 1); CheckTimestamp(); + CheckInt16(2); // vgroups CheckInt16(1); // replica CheckInt16(1); // quorum CheckInt16(10); // days @@ -299,10 +295,11 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { ASSERT_EQ(pMsg->code, 0); } - SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 16, NULL); + SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL); SendThenCheckShowRetrieveMsg(1); CheckBinary("d1", TSDB_DB_NAME_LEN - 1); CheckTimestamp(); + CheckInt16(2); // vgroups CheckInt16(1); // replica CheckInt16(2); // quorum CheckInt16(10); // days @@ -330,10 +327,11 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { uInfo("all server is running"); - SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 16, NULL); + SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL); SendThenCheckShowRetrieveMsg(1); CheckBinary("d1", TSDB_DB_NAME_LEN - 1); CheckTimestamp(); + CheckInt16(2); // vgroups CheckInt16(1); // replica CheckInt16(2); // quorum CheckInt16(10); // days @@ -350,7 +348,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { CheckInt8(0); // update { - SDropDbMsg* pReq = (SDropDbMsg*)rpcMallocCont(sizeof(SAlterDbMsg)); + SDropDbMsg* pReq = (SDropDbMsg*)rpcMallocCont(sizeof(SDropDbMsg)); strcpy(pReq->db, "1.d1"); SRpcMsg rpcMsg = {0}; @@ -364,6 +362,103 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { ASSERT_EQ(pMsg->code, 0); } - SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 16, NULL); + SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL); SendThenCheckShowRetrieveMsg(0); -} \ No newline at end of file +} + +TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) { + { + SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg)); + strcpy(pReq->db, "1.d2"); + pReq->numOfVgroups = htonl(2); + pReq->cacheBlockSize = htonl(16); + pReq->totalBlocks = htonl(10); + pReq->daysPerFile = htonl(10); + pReq->daysToKeep0 = htonl(3650); + pReq->daysToKeep1 = htonl(3650); + pReq->daysToKeep2 = htonl(3650); + pReq->minRowsPerFileBlock = htonl(100); + pReq->maxRowsPerFileBlock = htonl(4096); + pReq->commitTime = htonl(3600); + pReq->fsyncPeriod = htonl(3000); + pReq->walLevel = 1; + pReq->precision = 0; + pReq->compression = 2; + pReq->replications = 1; + pReq->quorum = 1; + pReq->update = 0; + pReq->cacheLastRow = 0; + pReq->ignoreExist = 1; + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SCreateDbMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DB; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + } + + SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL); + SendThenCheckShowRetrieveMsg(1); + CheckBinary("d2", TSDB_DB_NAME_LEN - 1); + + { + SUseDbMsg* pReq = (SUseDbMsg*)rpcMallocCont(sizeof(SUseDbMsg)); + strcpy(pReq->db, "1.d2"); + pReq->vgVersion = htonl(-1); + + SRpcMsg rpcMsg = {0}; + rpcMsg.pCont = pReq; + rpcMsg.contLen = sizeof(SUseDbMsg); + rpcMsg.msgType = TSDB_MSG_TYPE_USE_DB; + + sendMsg(pClient, &rpcMsg); + SRpcMsg* pMsg = pClient->pRsp; + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + + SUseDbRsp* pRsp = (SUseDbRsp*)pMsg->pCont; + EXPECT_STREQ(pRsp->db, "1.d2"); + pRsp->vgVersion = htonl(pRsp->vgVersion); + pRsp->vgNum = htonl(pRsp->vgNum); + pRsp->hashMethod = pRsp->hashMethod; + EXPECT_EQ(pRsp->vgVersion, 1); + EXPECT_EQ(pRsp->vgNum, 2); + EXPECT_EQ(pRsp->hashMethod, 1); + + { + SVgroupInfo* pInfo = &pRsp->vgroupInfo[0]; + pInfo->vgId = htonl(pInfo->vgId); + pInfo->hashBegin = htonl(pInfo->hashBegin); + pInfo->hashEnd = htonl(pInfo->hashEnd); + EXPECT_GT(pInfo->vgId, 0); + EXPECT_EQ(pInfo->hashBegin, 0); + EXPECT_EQ(pInfo->hashEnd, INT32_MAX / 2 - 1); + EXPECT_EQ(pInfo->inUse, 0); + EXPECT_EQ(pInfo->numOfEps, 1); + SEpAddrMsg* pAddr = &pInfo->epAddr[0]; + pAddr->port = htons(pAddr->port); + EXPECT_EQ(pAddr->port, 9040); + EXPECT_STREQ(pAddr->fqdn, "localhost"); + } + + { + SVgroupInfo* pInfo = &pRsp->vgroupInfo[1]; + pInfo->vgId = htonl(pInfo->vgId); + pInfo->hashBegin = htonl(pInfo->hashBegin); + pInfo->hashEnd = htonl(pInfo->hashEnd); + EXPECT_GT(pInfo->vgId, 0); + EXPECT_EQ(pInfo->hashBegin, INT32_MAX / 2); + EXPECT_EQ(pInfo->hashEnd, INT32_MAX); + EXPECT_EQ(pInfo->inUse, 0); + EXPECT_EQ(pInfo->numOfEps, 1); + SEpAddrMsg* pAddr = &pInfo->epAddr[0]; + pAddr->port = htons(pAddr->port); + EXPECT_EQ(pAddr->port, 9040); + EXPECT_STREQ(pAddr->fqdn, "localhost"); + } + } +} diff --git a/source/dnode/mgmt/impl/test/dnode/dnode.cpp b/source/dnode/mgmt/impl/test/dnode/dnode.cpp index e82d6e2949..6a8d1c7735 100644 --- a/source/dnode/mgmt/impl/test/dnode/dnode.cpp +++ b/source/dnode/mgmt/impl/test/dnode/dnode.cpp @@ -1,16 +1,12 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. +/** + * @file vnodeApiTests.cpp + * @author slguan (slguan@taosdata.com) + * @brief DNODE module API tests + * @version 0.1 + * @date 2021-12-14 * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. + * @copyright Copyright (c) 2021 * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . */ #include "deploy.h" diff --git a/source/dnode/mgmt/impl/test/user/user.cpp b/source/dnode/mgmt/impl/test/user/user.cpp index 7a4abcf1bc..7d08484475 100644 --- a/source/dnode/mgmt/impl/test/user/user.cpp +++ b/source/dnode/mgmt/impl/test/user/user.cpp @@ -1,16 +1,12 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. +/** + * @file vnodeApiTests.cpp + * @author slguan (slguan@taosdata.com) + * @brief DNODE module API tests + * @version 0.1 + * @date 2021-12-12 * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. + * @copyright Copyright (c) 2021 * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . */ #include "deploy.h" diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index cc8c541548..5a678118f3 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -210,7 +210,7 @@ typedef struct { int32_t cfgVersion; int32_t vgVersion; int32_t numOfVgroups; - int8_t hashMethod; // default is 1 + int8_t hashMethod; // default is 1 SDbCfg cfg; } SDbObj; @@ -227,6 +227,7 @@ typedef struct { int32_t hashBegin; int32_t hashEnd; char dbName[TSDB_FULL_DB_NAME_LEN]; + int64_t dbUid; int32_t numOfTables; int32_t numOfTimeSeries; int64_t totalStorage; diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 32d2fb3499..1c78ec5408 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -292,8 +292,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat dbObj.uid = mndGenerateUid(dbObj.name, TSDB_FULL_DB_NAME_LEN); dbObj.numOfVgroups = pCreate->numOfVgroups; dbObj.hashMethod = 1; - dbObj.cfgVersion = 0; - dbObj.vgVersion = 0; + dbObj.cfgVersion = 1; + dbObj.vgVersion = 1; dbObj.cfg = (SDbCfg){.cacheBlockSize = pCreate->cacheBlockSize, .totalBlocks = pCreate->totalBlocks, .daysPerFile = pCreate->daysPerFile, @@ -614,17 +614,69 @@ static int32_t mndProcessDropDbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; SUseDbMsg *pUse = pMsg->rpcMsg.pCont; + pUse->vgVersion = htonl(pUse->vgVersion); - SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); - if (pDb != NULL) { - strncpy(pMsg->db, pUse->db, TSDB_FULL_DB_NAME_LEN); - mndReleaseDb(pMnode, pDb); - return 0; - } else { - mError("db:%s, failed to process use db msg since %s", pMsg->db, terrstr()); + SDbObj *pDb = mndAcquireDb(pMnode, pUse->db); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_EXIST; + mError("db:%s, failed to process use db msg since %s", pUse->db, terrstr()); return -1; } + + int32_t contLen = sizeof(SUseDbRsp) + pDb->numOfVgroups * sizeof(SVgroupInfo); + SUseDbRsp *pRsp = rpcMallocCont(contLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + int32_t vindex = 0; + + if (pUse->vgVersion < pDb->vgVersion) { + void *pIter = NULL; + while (vindex < pDb->numOfVgroups) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + if (pVgroup->dbUid == pDb->uid) { + SVgroupInfo *pInfo = &pRsp->vgroupInfo[vindex]; + pInfo->vgId = htonl(pVgroup->vgId); + pInfo->hashBegin = htonl(pVgroup->hashBegin); + pInfo->hashEnd = htonl(pVgroup->hashEnd); + pInfo->numOfEps = pVgroup->replica; + for (int32_t gid = 0; gid < pVgroup->replica; ++gid) { + SVnodeGid *pVgid = &pVgroup->vnodeGid[gid]; + SEpAddrMsg *pEpArrr = &pInfo->epAddr[gid]; + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode != NULL) { + memcpy(pEpArrr->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + pEpArrr->port = htons(pDnode->port); + } + mndReleaseDnode(pMnode, pDnode); + if (pVgid->role == TAOS_SYNC_STATE_LEADER) { + pInfo->inUse = gid; + } + } + vindex++; + } + + sdbRelease(pSdb, pVgroup); + } + } + + memcpy(pRsp->db, pDb->name, TSDB_FULL_DB_NAME_LEN); + pRsp->vgVersion = htonl(pDb->vgVersion); + pRsp->vgNum = htonl(vindex); + pRsp->hashMethod = pDb->hashMethod; + + pMsg->pCont = pRsp; + pMsg->contLen = contLen; + mndReleaseDb(pMnode, pDb); + + return 0; } static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) { @@ -674,6 +726,12 @@ static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMe pSchema[cols].bytes = htons(pShow->bytes[cols]); cols++; + pShow->bytes[cols] = 2; + pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; + strcpy(pSchema[cols].name, "vgroups"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + pShow->bytes[cols] = 2; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; strcpy(pSchema[cols].name, "replica"); @@ -811,6 +869,10 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3 *(int64_t *)pWrite = pDb->createdTime; cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int16_t *)pWrite = pDb->numOfVgroups; + cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; *(int16_t *)pWrite = pDb->cfg.replications; cols++; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 03d18b37e2..7dfe8c26ea 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -24,9 +24,9 @@ #define TSDB_VGROUP_VER_NUM 1 #define TSDB_VGROUP_RESERVE_SIZE 64 -static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); -static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); -static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup); +static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); +static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); +static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup); static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg); static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg); @@ -80,6 +80,7 @@ SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) { SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin) SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd) SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_FULL_DB_NAME_LEN) + SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid) SDB_SET_INT8(pRaw, dataPos, pVgroup->replica) for (int8_t i = 0; i < pVgroup->replica; ++i) { SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; @@ -102,7 +103,7 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { return NULL; } - SSdbRow *pRow = sdbAllocRow(sizeof(SDbObj)); + SSdbRow *pRow = sdbAllocRow(sizeof(SVgObj)); SVgObj *pVgroup = sdbGetRowObj(pRow); if (pVgroup == NULL) return NULL; @@ -114,6 +115,7 @@ SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->hashBegin) SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->hashEnd) SDB_GET_BINARY(pRaw, pRow, dataPos, pVgroup->dbName, TSDB_FULL_DB_NAME_LEN) + SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->dbUid) SDB_GET_INT8(pRaw, pRow, dataPos, &pVgroup->replica) for (int8_t i = 0; i < pVgroup->replica; ++i) { SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; @@ -222,12 +224,13 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { pVgroup->vgId = maxVgId++; pVgroup->createdTime = taosGetTimestampMs(); pVgroup->updateTime = pVgroups->createdTime; - pVgroup->version = 0; + pVgroup->version = 1; + pVgroup->dbUid = pDb->uid; pVgroup->hashBegin = hashMin + hashInterval * v; if (v == pDb->numOfVgroups - 1) { pVgroup->hashEnd = hashMax; } else { - pVgroup->hashEnd = hashMin + hashInterval * (v + 1); + pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1; } memcpy(pVgroup->dbName, pDb->name, TSDB_FULL_DB_NAME_LEN); -- GitLab