提交 3ac8715c 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/3.0' into catalog_dev

...@@ -215,8 +215,8 @@ typedef enum _mgmt_table { ...@@ -215,8 +215,8 @@ typedef enum _mgmt_table {
extern char *taosMsg[]; extern char *taosMsg[];
typedef struct SBuildTableMetaInput { typedef struct SBuildTableMetaInput {
int32_t vgId; int32_t vgId;
char *tableFullName; char *tableFullName;
} SBuildTableMetaInput; } SBuildTableMetaInput;
typedef struct SBuildUseDBInput { typedef struct SBuildUseDBInput {
...@@ -224,7 +224,6 @@ typedef struct SBuildUseDBInput { ...@@ -224,7 +224,6 @@ typedef struct SBuildUseDBInput {
int32_t vgVersion; int32_t vgVersion;
} SBuildUseDBInput; } SBuildUseDBInput;
#pragma pack(push, 1) #pragma pack(push, 1)
// null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta // null-terminated string instead of char array to avoid too many memory consumption in case of more than 1M tableMeta
...@@ -234,7 +233,7 @@ typedef struct { ...@@ -234,7 +233,7 @@ typedef struct {
} SEpAddrMsg; } SEpAddrMsg;
typedef struct { typedef struct {
char* fqdn; char *fqdn;
uint16_t port; uint16_t port;
} SEpAddr1; } SEpAddr1;
...@@ -261,10 +260,10 @@ typedef struct SSubmitBlk { ...@@ -261,10 +260,10 @@ typedef struct SSubmitBlk {
// Submit message for this TSDB // Submit message for this TSDB
typedef struct SSubmitMsg { typedef struct SSubmitMsg {
SMsgHead header; SMsgHead header;
int32_t length; int32_t length;
int32_t numOfBlocks; int32_t numOfBlocks;
char blocks[]; char blocks[];
} SSubmitMsg; } SSubmitMsg;
typedef struct { typedef struct {
...@@ -343,7 +342,7 @@ typedef struct { ...@@ -343,7 +342,7 @@ typedef struct {
typedef struct { typedef struct {
char tableFname[TSDB_TABLE_FNAME_LEN]; char tableFname[TSDB_TABLE_FNAME_LEN];
char db[TSDB_FULL_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
int16_t type; /* operation type */ int16_t type; /* operation type */
int16_t numOfCols; /* number of schema */ int16_t numOfCols; /* number of schema */
int32_t tagValLen; int32_t tagValLen;
SSchema schema[]; SSchema schema[];
...@@ -352,17 +351,17 @@ typedef struct { ...@@ -352,17 +351,17 @@ typedef struct {
} SAlterTableMsg; } SAlterTableMsg;
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
int64_t uid; int64_t uid;
int32_t tid; int32_t tid;
int16_t tversion; int16_t tversion;
int16_t colId; int16_t colId;
int8_t type; int8_t type;
int16_t bytes; int16_t bytes;
int32_t tagValLen; int32_t tagValLen;
int16_t numOfTags; int16_t numOfTags;
int32_t schemaLen; int32_t schemaLen;
char data[]; char data[];
} SUpdateTableTagValMsg; } SUpdateTableTagValMsg;
typedef struct { typedef struct {
...@@ -425,7 +424,7 @@ typedef struct { ...@@ -425,7 +424,7 @@ typedef struct {
int32_t contLen; int32_t contLen;
int32_t vgId; int32_t vgId;
uint64_t uid; uint64_t uid;
char tableFname[TSDB_TABLE_FNAME_LEN]; char tableFname[TSDB_TABLE_FNAME_LEN];
} SDropSTableMsg; } SDropSTableMsg;
typedef struct SColIndex { typedef struct SColIndex {
...@@ -438,7 +437,7 @@ typedef struct SColIndex { ...@@ -438,7 +437,7 @@ typedef struct SColIndex {
typedef struct SColumnFilterInfo { typedef struct SColumnFilterInfo {
int16_t lowerRelOptr; int16_t lowerRelOptr;
int16_t upperRelOptr; int16_t upperRelOptr;
int16_t filterstr; // denote if current column is char(binary/nchar) int16_t filterstr; // denote if current column is char(binary/nchar)
union { union {
struct { struct {
...@@ -457,9 +456,9 @@ typedef struct SColumnFilterInfo { ...@@ -457,9 +456,9 @@ typedef struct SColumnFilterInfo {
} SColumnFilterInfo; } SColumnFilterInfo;
typedef struct SColumnFilterList { typedef struct SColumnFilterList {
int16_t numOfFilters; int16_t numOfFilters;
union{ union {
int64_t placeholder; int64_t placeholder;
SColumnFilterInfo *filterInfo; SColumnFilterInfo *filterInfo;
}; };
} SColumnFilterList; } SColumnFilterList;
...@@ -468,10 +467,10 @@ typedef struct SColumnFilterList { ...@@ -468,10 +467,10 @@ typedef struct SColumnFilterList {
* But for data in vnode side, we need all the following information. * But for data in vnode side, we need all the following information.
*/ */
typedef struct SColumnInfo { typedef struct SColumnInfo {
int16_t colId; int16_t colId;
int16_t type; int16_t type;
int16_t bytes; int16_t bytes;
SColumnFilterList flist; SColumnFilterList flist;
} SColumnInfo; } SColumnInfo;
typedef struct STableIdInfo { typedef struct STableIdInfo {
...@@ -485,14 +484,14 @@ typedef struct STimeWindow { ...@@ -485,14 +484,14 @@ typedef struct STimeWindow {
} STimeWindow; } STimeWindow;
typedef struct { typedef struct {
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block int32_t tsLen; // total length of ts comp block
int32_t tsNumOfBlocks; // ts comp block numbers int32_t tsNumOfBlocks; // ts comp block numbers
int32_t tsOrder; // ts comp block order int32_t tsOrder; // ts comp block order
} STsBufInfo; } STsBufInfo;
typedef struct SInterval { typedef struct SInterval {
int32_t tz; // query client timezone int32_t tz; // query client timezone
char intervalUnit; char intervalUnit;
char slidingUnit; char slidingUnit;
char offsetUnit; char offsetUnit;
...@@ -502,51 +501,51 @@ typedef struct SInterval { ...@@ -502,51 +501,51 @@ typedef struct SInterval {
} SInterval; } SInterval;
typedef struct { typedef struct {
SMsgHead head; SMsgHead head;
char version[TSDB_VERSION_LEN]; char version[TSDB_VERSION_LEN];
bool stableQuery; // super table query or not bool stableQuery; // super table query or not
bool topBotQuery; // TODO used bitwise flag bool topBotQuery; // TODO used bitwise flag
bool interpQuery; // interp query or not bool interpQuery; // interp query or not
bool groupbyColumn; // denote if this is a groupby normal column query bool groupbyColumn; // denote if this is a groupby normal column query
bool hasTagResults; // if there are tag values in final result or not bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation bool timeWindowInterpo; // if the time window start/end required interpolation
bool queryBlockDist; // if query data block distribution bool queryBlockDist; // if query data block distribution
bool stabledev; // super table stddev query bool stabledev; // super table stddev query
bool tsCompQuery; // is tscomp query bool tsCompQuery; // is tscomp query
bool simpleAgg; bool simpleAgg;
bool pointInterpQuery; // point interpolation query bool pointInterpQuery; // point interpolation query
bool needReverseScan; // need reverse scan bool needReverseScan; // need reverse scan
bool stateWindow; // state window flag bool stateWindow; // state window flag
STimeWindow window; STimeWindow window;
int32_t numOfTables; int32_t numOfTables;
int16_t order; int16_t order;
int16_t orderColId; int16_t orderColId;
int16_t numOfCols; // the number of columns will be load from vnode int16_t numOfCols; // the number of columns will be load from vnode
SInterval interval; SInterval interval;
// SSessionWindow sw; // session window // SSessionWindow sw; // session window
int16_t tagCondLen; // tag length in current query int16_t tagCondLen; // tag length in current query
int16_t colCondLen; // column length in current query int16_t colCondLen; // column length in current query
int16_t numOfGroupCols; // num of group by columns int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx; int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx int16_t orderType; // used in group by xx order by xxx
int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query. int64_t vgroupLimit; // limit the number of rows for each table, used in order by + limit in stable projection query.
int16_t prjOrder; // global order in super table projection query. int16_t prjOrder; // global order in super table projection query.
int64_t limit; int64_t limit;
int64_t offset; int64_t offset;
int32_t queryType; // denote another query process int32_t queryType; // denote another query process
int16_t numOfOutput; // final output columns numbers int16_t numOfOutput; // final output columns numbers
int16_t fillType; // interpolate type int16_t fillType; // interpolate type
int64_t fillVal; // default value array list int64_t fillVal; // default value array list
int32_t secondStageOutput; int32_t secondStageOutput;
STsBufInfo tsBuf; // tsBuf info STsBufInfo tsBuf; // tsBuf info
int32_t numOfTags; // number of tags columns involved int32_t numOfTags; // number of tags columns involved
int32_t sqlstrLen; // sql query string int32_t sqlstrLen; // sql query string
int32_t prevResultLen; // previous result length int32_t prevResultLen; // previous result length
int32_t numOfOperator; int32_t numOfOperator;
int32_t tableScanOperator;// table scan operator. -1 means no scan operator int32_t tableScanOperator; // table scan operator. -1 means no scan operator
int32_t udfNum; // number of udf function int32_t udfNum; // number of udf function
int32_t udfContentOffset; int32_t udfContentOffset;
int32_t udfContentLen; int32_t udfContentLen;
SColumnInfo tableCols[]; SColumnInfo tableCols[];
...@@ -585,6 +584,7 @@ typedef struct SRetrieveTableRsp { ...@@ -585,6 +584,7 @@ typedef struct SRetrieveTableRsp {
typedef struct { typedef struct {
char db[TSDB_FULL_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
int32_t numOfVgroups;
int32_t cacheBlockSize; // MB int32_t cacheBlockSize; // MB
int32_t totalBlocks; int32_t totalBlocks;
int32_t daysPerFile; int32_t daysPerFile;
...@@ -627,7 +627,6 @@ typedef struct { ...@@ -627,7 +627,6 @@ typedef struct {
typedef struct { typedef struct {
char db[TSDB_TABLE_FNAME_LEN]; char db[TSDB_TABLE_FNAME_LEN];
int8_t ignoreNotExists;
int32_t vgVersion; int32_t vgVersion;
int32_t reserve[8]; int32_t reserve[8];
} SUseDbMsg; } SUseDbMsg;
...@@ -790,8 +789,8 @@ typedef struct { ...@@ -790,8 +789,8 @@ typedef struct {
} SStbInfoMsg; } SStbInfoMsg;
typedef struct { typedef struct {
SMsgHead msgHead; SMsgHead msgHead;
char tableFname[TSDB_TABLE_FNAME_LEN]; char tableFname[TSDB_TABLE_FNAME_LEN];
} STableInfoMsg; } STableInfoMsg;
typedef struct { typedef struct {
...@@ -833,19 +832,19 @@ typedef struct { ...@@ -833,19 +832,19 @@ typedef struct {
} SVgroupsMsg, SVgroupsInfo; } SVgroupsMsg, SVgroupsInfo;
typedef struct { typedef struct {
char tbFname[TSDB_TABLE_FNAME_LEN]; // table full name char tbFname[TSDB_TABLE_FNAME_LEN]; // table full name
char stbFname[TSDB_TABLE_FNAME_LEN]; char stbFname[TSDB_TABLE_FNAME_LEN];
int32_t numOfTags; int32_t numOfTags;
int32_t numOfColumns; int32_t numOfColumns;
int8_t precision; int8_t precision;
int8_t tableType; int8_t tableType;
int8_t update; int8_t update;
int32_t sversion; int32_t sversion;
int32_t tversion; int32_t tversion;
uint64_t tuid; uint64_t tuid;
uint64_t suid; uint64_t suid;
int32_t vgId; int32_t vgId;
SSchema pSchema[]; SSchema pSchema[];
} STableMetaMsg; } STableMetaMsg;
typedef struct SMultiTableMeta { typedef struct SMultiTableMeta {
...@@ -873,8 +872,6 @@ typedef struct { ...@@ -873,8 +872,6 @@ typedef struct {
SVgroupInfo vgroupInfo[]; SVgroupInfo vgroupInfo[];
} SUseDbRsp; } SUseDbRsp;
/* /*
* sql: show tables like '%a_%' * sql: show tables like '%a_%'
* payload is the query condition, e.g., '%a_%' * payload is the query condition, e.g., '%a_%'
...@@ -888,7 +885,7 @@ typedef struct { ...@@ -888,7 +885,7 @@ typedef struct {
} SShowMsg; } SShowMsg;
typedef struct { typedef struct {
char db[TSDB_FULL_DB_NAME_LEN]; char db[TSDB_FULL_DB_NAME_LEN];
int32_t numOfVgroup; int32_t numOfVgroup;
int32_t vgid[]; int32_t vgid[];
} SCompactMsg; } SCompactMsg;
...@@ -1009,45 +1006,37 @@ typedef struct { ...@@ -1009,45 +1006,37 @@ typedef struct {
} SAuthMsg, SAuthRsp; } SAuthMsg, SAuthRsp;
typedef struct { typedef struct {
int8_t finished; int8_t finished;
int8_t reserved1[7]; int8_t reserved1[7];
char name[TSDB_STEP_NAME_LEN]; char name[TSDB_STEP_NAME_LEN];
char desc[TSDB_STEP_DESC_LEN]; char desc[TSDB_STEP_DESC_LEN];
} SStartupMsg; } SStartupMsg;
// mq related // mq related
typedef struct { typedef struct {
} SMqConnectReq; } SMqConnectReq;
typedef struct { typedef struct {
} SMqConnectRsp; } SMqConnectRsp;
typedef struct { typedef struct {
} SMqDisconnectReq; } SMqDisconnectReq;
typedef struct { typedef struct {
} SMqDisconnectRsp; } SMqDisconnectRsp;
typedef struct { typedef struct {
} SMqAckReq; } SMqAckReq;
typedef struct { typedef struct {
} SMqAckRsp; } SMqAckRsp;
typedef struct { typedef struct {
} SMqResetReq; } SMqResetReq;
typedef struct { typedef struct {
} SMqResetRsp; } SMqResetRsp;
//mq related end // mq related end
typedef struct { typedef struct {
/* data */ /* data */
...@@ -1101,7 +1090,6 @@ typedef struct { ...@@ -1101,7 +1090,6 @@ typedef struct {
/* data */ /* data */
} SUpdateTagValRsp; } SUpdateTagValRsp;
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -233,8 +233,8 @@ do { \ ...@@ -233,8 +233,8 @@ do { \
#define TSDB_CQ_SQL_SIZE 1024 #define TSDB_CQ_SQL_SIZE 1024
#define TSDB_MIN_VNODES 64 #define TSDB_MIN_VNODES 64
#define TSDB_MAX_VNODES 512 #define TSDB_MAX_VNODES 512
#define TSDB_MIN_VNODES_PER_DB 2 #define TSDB_MIN_VNODES_PER_DB 1
#define TSDB_MAX_VNODES_PER_DB 64 #define TSDB_MAX_VNODES_PER_DB 4096
#define TSDB_DNODE_ROLE_ANY 0 #define TSDB_DNODE_ROLE_ANY 0
#define TSDB_DNODE_ROLE_MGMT 1 #define TSDB_DNODE_ROLE_MGMT 1
...@@ -247,7 +247,7 @@ do { \ ...@@ -247,7 +247,7 @@ do { \
#define TSDB_RES_COL_ID (-5000) #define TSDB_RES_COL_ID (-5000)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta #define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
#define TSDB_MIN_CACHE_BLOCK_SIZE 1 #define TSDB_MIN_CACHE_BLOCK_SIZE 1
#define TSDB_MAX_CACHE_BLOCK_SIZE 128 // 128MB for each vnode #define TSDB_MAX_CACHE_BLOCK_SIZE 128 // 128MB for each vnode
#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16 #define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16
......
/* /**
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> * @file vnodeApiTests.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module acct-msg tests
* @version 0.1
* @date 2021-12-15
* *
* This program is free software: you can use, redistribute, and/or modify * @copyright Copyright (c) 2021
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* *
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "deploy.h" #include "deploy.h"
...@@ -49,7 +45,7 @@ SServer* DndTestAcct::pServer; ...@@ -49,7 +45,7 @@ SServer* DndTestAcct::pServer;
SClient* DndTestAcct::pClient; SClient* DndTestAcct::pClient;
int32_t DndTestAcct::connId; int32_t DndTestAcct::connId;
TEST_F(DndTestAcct, CreateAcct) { TEST_F(DndTestAcct, 01_CreateAcct) {
ASSERT_NE(pClient, nullptr); ASSERT_NE(pClient, nullptr);
SCreateAcctMsg* pReq = (SCreateAcctMsg*)rpcMallocCont(sizeof(SCreateAcctMsg)); SCreateAcctMsg* pReq = (SCreateAcctMsg*)rpcMallocCont(sizeof(SCreateAcctMsg));
...@@ -65,7 +61,7 @@ TEST_F(DndTestAcct, CreateAcct) { ...@@ -65,7 +61,7 @@ TEST_F(DndTestAcct, CreateAcct) {
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MSG_NOT_PROCESSED); ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MSG_NOT_PROCESSED);
} }
TEST_F(DndTestAcct, AlterAcct) { TEST_F(DndTestAcct, 02_AlterAcct) {
ASSERT_NE(pClient, nullptr); ASSERT_NE(pClient, nullptr);
SAlterAcctMsg* pReq = (SAlterAcctMsg*)rpcMallocCont(sizeof(SAlterAcctMsg)); SAlterAcctMsg* pReq = (SAlterAcctMsg*)rpcMallocCont(sizeof(SAlterAcctMsg));
...@@ -81,7 +77,7 @@ TEST_F(DndTestAcct, AlterAcct) { ...@@ -81,7 +77,7 @@ TEST_F(DndTestAcct, AlterAcct) {
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MSG_NOT_PROCESSED); ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MSG_NOT_PROCESSED);
} }
TEST_F(DndTestAcct, DropAcct) { TEST_F(DndTestAcct, 03_DropAcct) {
ASSERT_NE(pClient, nullptr); ASSERT_NE(pClient, nullptr);
SDropAcctMsg* pReq = (SDropAcctMsg*)rpcMallocCont(sizeof(SDropAcctMsg)); SDropAcctMsg* pReq = (SDropAcctMsg*)rpcMallocCont(sizeof(SDropAcctMsg));
...@@ -97,7 +93,7 @@ TEST_F(DndTestAcct, DropAcct) { ...@@ -97,7 +93,7 @@ TEST_F(DndTestAcct, DropAcct) {
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MSG_NOT_PROCESSED); ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MSG_NOT_PROCESSED);
} }
TEST_F(DndTestAcct, ShowAcct) { TEST_F(DndTestAcct, 04_ShowAcct) {
ASSERT_NE(pClient, nullptr); ASSERT_NE(pClient, nullptr);
SShowMsg* pReq = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); SShowMsg* pReq = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
......
/* /**
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> * @file vnodeApiTests.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module cluster-msg tests
* @version 0.1
* @date 2021-12-15
* *
* This program is free software: you can use, redistribute, and/or modify * @copyright Copyright (c) 2021
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* *
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "deploy.h" #include "deploy.h"
......
/* /**
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> * @file vnodeApiTests.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module db-msg tests
* @version 0.1
* @date 2021-12-15
* *
* This program is free software: you can use, redistribute, and/or modify * @copyright Copyright (c) 2021
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* *
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "deploy.h" #include "deploy.h"
...@@ -30,7 +26,7 @@ class DndTestDb : public ::testing::Test { ...@@ -30,7 +26,7 @@ class DndTestDb : public ::testing::Test {
const char* firstEp = "localhost:9040"; const char* firstEp = "localhost:9040";
pServer = CreateServer("/tmp/dnode_test_db", fqdn, 9040, firstEp); pServer = CreateServer("/tmp/dnode_test_db", fqdn, 9040, firstEp);
pClient = createClient("root", "taosdata", fqdn, 9040); pClient = createClient("root", "taosdata", fqdn, 9040);
taosMsleep(300); taosMsleep(1100);
} }
static void TearDownTestSuite() { static void TearDownTestSuite() {
...@@ -48,11 +44,12 @@ class DndTestDb : public ::testing::Test { ...@@ -48,11 +44,12 @@ class DndTestDb : public ::testing::Test {
void SetUp() override {} void SetUp() override {}
void TearDown() override {} void TearDown() override {}
void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns) { void SendTheCheckShowMetaMsg(int8_t showType, const char* showName, int32_t columns, const char* db) {
SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg)); SShowMsg* pShow = (SShowMsg*)rpcMallocCont(sizeof(SShowMsg));
pShow->type = showType; pShow->type = showType;
strcpy(pShow->db, ""); if (db != NULL) {
strcpy(pShow->db, db);
}
SRpcMsg showRpcMsg = {0}; SRpcMsg showRpcMsg = {0};
showRpcMsg.pCont = pShow; showRpcMsg.pCont = pShow;
showRpcMsg.contLen = sizeof(SShowMsg); showRpcMsg.contLen = sizeof(SShowMsg);
...@@ -183,31 +180,33 @@ SClient* DndTestDb::pClient; ...@@ -183,31 +180,33 @@ SClient* DndTestDb::pClient;
int32_t DndTestDb::connId; int32_t DndTestDb::connId;
TEST_F(DndTestDb, 01_ShowDb) { TEST_F(DndTestDb, 01_ShowDb) {
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 16); SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL);
CheckSchema(0, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "name"); CheckSchema(0, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "name");
CheckSchema(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create time"); CheckSchema(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create time");
CheckSchema(2, TSDB_DATA_TYPE_SMALLINT, 2, "replica"); CheckSchema(2, TSDB_DATA_TYPE_SMALLINT, 2, "vgroups");
CheckSchema(3, TSDB_DATA_TYPE_SMALLINT, 2, "quorum"); CheckSchema(3, TSDB_DATA_TYPE_SMALLINT, 2, "replica");
CheckSchema(4, TSDB_DATA_TYPE_SMALLINT, 2, "days"); CheckSchema(4, TSDB_DATA_TYPE_SMALLINT, 2, "quorum");
CheckSchema(5, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "keep0,keep1,keep2"); CheckSchema(5, TSDB_DATA_TYPE_SMALLINT, 2, "days");
CheckSchema(6, TSDB_DATA_TYPE_INT, 4, "cache(MB)"); CheckSchema(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "keep0,keep1,keep2");
CheckSchema(7, TSDB_DATA_TYPE_INT, 4, "blocks"); CheckSchema(7, TSDB_DATA_TYPE_INT, 4, "cache(MB)");
CheckSchema(8, TSDB_DATA_TYPE_INT, 4, "minrows"); CheckSchema(8, TSDB_DATA_TYPE_INT, 4, "blocks");
CheckSchema(9, TSDB_DATA_TYPE_INT, 4, "maxrows"); CheckSchema(9, TSDB_DATA_TYPE_INT, 4, "minrows");
CheckSchema(10, TSDB_DATA_TYPE_TINYINT, 1, "wallevel"); CheckSchema(10, TSDB_DATA_TYPE_INT, 4, "maxrows");
CheckSchema(11, TSDB_DATA_TYPE_INT, 4, "fsync"); CheckSchema(11, TSDB_DATA_TYPE_TINYINT, 1, "wallevel");
CheckSchema(12, TSDB_DATA_TYPE_TINYINT, 1, "comp"); CheckSchema(12, TSDB_DATA_TYPE_INT, 4, "fsync");
CheckSchema(13, TSDB_DATA_TYPE_TINYINT, 1, "cachelast"); CheckSchema(13, TSDB_DATA_TYPE_TINYINT, 1, "comp");
CheckSchema(14, TSDB_DATA_TYPE_BINARY, 3 + VARSTR_HEADER_SIZE, "precision"); CheckSchema(14, TSDB_DATA_TYPE_TINYINT, 1, "cachelast");
CheckSchema(15, TSDB_DATA_TYPE_TINYINT, 1, "update"); CheckSchema(15, TSDB_DATA_TYPE_BINARY, 3 + VARSTR_HEADER_SIZE, "precision");
CheckSchema(16, TSDB_DATA_TYPE_TINYINT, 1, "update");
SendThenCheckShowRetrieveMsg(0); SendThenCheckShowRetrieveMsg(0);
} }
TEST_F(DndTestDb, 02_CreateDb) { TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
{ {
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg)); SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg));
strcpy(pReq->db, "1.d1"); strcpy(pReq->db, "1.d1");
pReq->numOfVgroups = htonl(2);
pReq->cacheBlockSize = htonl(16); pReq->cacheBlockSize = htonl(16);
pReq->totalBlocks = htonl(10); pReq->totalBlocks = htonl(10);
pReq->daysPerFile = htonl(10); pReq->daysPerFile = htonl(10);
...@@ -238,10 +237,11 @@ TEST_F(DndTestDb, 02_CreateDb) { ...@@ -238,10 +237,11 @@ TEST_F(DndTestDb, 02_CreateDb) {
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
} }
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 16); SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL);
SendThenCheckShowRetrieveMsg(1); SendThenCheckShowRetrieveMsg(1);
CheckBinary("d1", TSDB_DB_NAME_LEN - 1); CheckBinary("d1", TSDB_DB_NAME_LEN - 1);
CheckTimestamp(); CheckTimestamp();
CheckInt16(2); // vgroups
CheckInt16(1); // replica CheckInt16(1); // replica
CheckInt16(1); // quorum CheckInt16(1); // quorum
CheckInt16(10); // days CheckInt16(10); // days
...@@ -256,9 +256,22 @@ TEST_F(DndTestDb, 02_CreateDb) { ...@@ -256,9 +256,22 @@ TEST_F(DndTestDb, 02_CreateDb) {
CheckInt8(0); // cachelast CheckInt8(0); // cachelast
CheckBinary("ms", 3); // precision CheckBinary("ms", 3); // precision
CheckInt8(0); // update CheckInt8(0); // update
}
TEST_F(DndTestDb, 03_AlterDb) { SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_VGROUP, "show vgroups", 4, "1.d1");
CheckSchema(0, TSDB_DATA_TYPE_INT, 4, "vgId");
CheckSchema(1, TSDB_DATA_TYPE_INT, 4, "tables");
CheckSchema(2, TSDB_DATA_TYPE_SMALLINT, 2, "v1_dnode");
CheckSchema(3, TSDB_DATA_TYPE_BINARY, 9 + VARSTR_HEADER_SIZE, "v1_status");
SendThenCheckShowRetrieveMsg(2);
CheckInt32(1);
CheckInt32(2);
CheckInt32(0);
CheckInt32(0);
CheckInt16(1);
CheckInt16(1);
CheckBinary("master", 9);
CheckBinary("master", 9);
{ {
SAlterDbMsg* pReq = (SAlterDbMsg*)rpcMallocCont(sizeof(SAlterDbMsg)); SAlterDbMsg* pReq = (SAlterDbMsg*)rpcMallocCont(sizeof(SAlterDbMsg));
strcpy(pReq->db, "1.d1"); strcpy(pReq->db, "1.d1");
...@@ -282,10 +295,11 @@ TEST_F(DndTestDb, 03_AlterDb) { ...@@ -282,10 +295,11 @@ TEST_F(DndTestDb, 03_AlterDb) {
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
} }
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 16); SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL);
SendThenCheckShowRetrieveMsg(1); SendThenCheckShowRetrieveMsg(1);
CheckBinary("d1", TSDB_DB_NAME_LEN - 1); CheckBinary("d1", TSDB_DB_NAME_LEN - 1);
CheckTimestamp(); CheckTimestamp();
CheckInt16(2); // vgroups
CheckInt16(1); // replica CheckInt16(1); // replica
CheckInt16(2); // quorum CheckInt16(2); // quorum
CheckInt16(10); // days CheckInt16(10); // days
...@@ -300,9 +314,8 @@ TEST_F(DndTestDb, 03_AlterDb) { ...@@ -300,9 +314,8 @@ TEST_F(DndTestDb, 03_AlterDb) {
CheckInt8(1); // cachelast CheckInt8(1); // cachelast
CheckBinary("ms", 3); // precision CheckBinary("ms", 3); // precision
CheckInt8(0); // update CheckInt8(0); // update
}
TEST_F(DndTestDb, 04_RestartDnode) { // restart
stopServer(pServer); stopServer(pServer);
pServer = NULL; pServer = NULL;
...@@ -314,10 +327,11 @@ TEST_F(DndTestDb, 04_RestartDnode) { ...@@ -314,10 +327,11 @@ TEST_F(DndTestDb, 04_RestartDnode) {
uInfo("all server is running"); uInfo("all server is running");
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 16); SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL);
SendThenCheckShowRetrieveMsg(1); SendThenCheckShowRetrieveMsg(1);
CheckBinary("d1", TSDB_DB_NAME_LEN - 1); CheckBinary("d1", TSDB_DB_NAME_LEN - 1);
CheckTimestamp(); CheckTimestamp();
CheckInt16(2); // vgroups
CheckInt16(1); // replica CheckInt16(1); // replica
CheckInt16(2); // quorum CheckInt16(2); // quorum
CheckInt16(10); // days CheckInt16(10); // days
...@@ -332,11 +346,9 @@ TEST_F(DndTestDb, 04_RestartDnode) { ...@@ -332,11 +346,9 @@ TEST_F(DndTestDb, 04_RestartDnode) {
CheckInt8(1); // cachelast CheckInt8(1); // cachelast
CheckBinary("ms", 3); // precision CheckBinary("ms", 3); // precision
CheckInt8(0); // update CheckInt8(0); // update
}
TEST_F(DndTestDb, 05_DropDb) {
{ {
SDropDbMsg* pReq = (SDropDbMsg*)rpcMallocCont(sizeof(SAlterDbMsg)); SDropDbMsg* pReq = (SDropDbMsg*)rpcMallocCont(sizeof(SDropDbMsg));
strcpy(pReq->db, "1.d1"); strcpy(pReq->db, "1.d1");
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
...@@ -350,6 +362,103 @@ TEST_F(DndTestDb, 05_DropDb) { ...@@ -350,6 +362,103 @@ TEST_F(DndTestDb, 05_DropDb) {
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
} }
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 16); SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL);
SendThenCheckShowRetrieveMsg(0); SendThenCheckShowRetrieveMsg(0);
} }
\ No newline at end of file
TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) {
{
SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg));
strcpy(pReq->db, "1.d2");
pReq->numOfVgroups = htonl(2);
pReq->cacheBlockSize = htonl(16);
pReq->totalBlocks = htonl(10);
pReq->daysPerFile = htonl(10);
pReq->daysToKeep0 = htonl(3650);
pReq->daysToKeep1 = htonl(3650);
pReq->daysToKeep2 = htonl(3650);
pReq->minRowsPerFileBlock = htonl(100);
pReq->maxRowsPerFileBlock = htonl(4096);
pReq->commitTime = htonl(3600);
pReq->fsyncPeriod = htonl(3000);
pReq->walLevel = 1;
pReq->precision = 0;
pReq->compression = 2;
pReq->replications = 1;
pReq->quorum = 1;
pReq->update = 0;
pReq->cacheLastRow = 0;
pReq->ignoreExist = 1;
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateDbMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DB;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DB, "show databases", 17, NULL);
SendThenCheckShowRetrieveMsg(1);
CheckBinary("d2", TSDB_DB_NAME_LEN - 1);
{
SUseDbMsg* pReq = (SUseDbMsg*)rpcMallocCont(sizeof(SUseDbMsg));
strcpy(pReq->db, "1.d2");
pReq->vgVersion = htonl(-1);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SUseDbMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_USE_DB;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
SUseDbRsp* pRsp = (SUseDbRsp*)pMsg->pCont;
EXPECT_STREQ(pRsp->db, "1.d2");
pRsp->vgVersion = htonl(pRsp->vgVersion);
pRsp->vgNum = htonl(pRsp->vgNum);
pRsp->hashMethod = pRsp->hashMethod;
EXPECT_EQ(pRsp->vgVersion, 1);
EXPECT_EQ(pRsp->vgNum, 2);
EXPECT_EQ(pRsp->hashMethod, 1);
{
SVgroupInfo* pInfo = &pRsp->vgroupInfo[0];
pInfo->vgId = htonl(pInfo->vgId);
pInfo->hashBegin = htonl(pInfo->hashBegin);
pInfo->hashEnd = htonl(pInfo->hashEnd);
EXPECT_GT(pInfo->vgId, 0);
EXPECT_EQ(pInfo->hashBegin, 0);
EXPECT_EQ(pInfo->hashEnd, INT32_MAX / 2 - 1);
EXPECT_EQ(pInfo->inUse, 0);
EXPECT_EQ(pInfo->numOfEps, 1);
SEpAddrMsg* pAddr = &pInfo->epAddr[0];
pAddr->port = htons(pAddr->port);
EXPECT_EQ(pAddr->port, 9040);
EXPECT_STREQ(pAddr->fqdn, "localhost");
}
{
SVgroupInfo* pInfo = &pRsp->vgroupInfo[1];
pInfo->vgId = htonl(pInfo->vgId);
pInfo->hashBegin = htonl(pInfo->hashBegin);
pInfo->hashEnd = htonl(pInfo->hashEnd);
EXPECT_GT(pInfo->vgId, 0);
EXPECT_EQ(pInfo->hashBegin, INT32_MAX / 2);
EXPECT_EQ(pInfo->hashEnd, INT32_MAX);
EXPECT_EQ(pInfo->inUse, 0);
EXPECT_EQ(pInfo->numOfEps, 1);
SEpAddrMsg* pAddr = &pInfo->epAddr[0];
pAddr->port = htons(pAddr->port);
EXPECT_EQ(pAddr->port, 9040);
EXPECT_STREQ(pAddr->fqdn, "localhost");
}
}
}
/* /**
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> * @file vnodeApiTests.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module dnode-msg tests
* @version 0.1
* @date 2021-12-15
* *
* This program is free software: you can use, redistribute, and/or modify * @copyright Copyright (c) 2021
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* *
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "deploy.h" #include "deploy.h"
...@@ -224,19 +220,21 @@ TEST_F(DndTestDnode, 02_ConfigDnode) { ...@@ -224,19 +220,21 @@ TEST_F(DndTestDnode, 02_ConfigDnode) {
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
} }
TEST_F(DndTestDnode, 03_CreateDnode) { TEST_F(DndTestDnode, 03_Create_Drop_Reatrt_Dnode) {
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg)); {
strcpy(pReq->ep, "localhost:9042"); SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg));
strcpy(pReq->ep, "localhost:9042");
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq; rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SCreateDnodeMsg); rpcMsg.contLen = sizeof(SCreateDnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DNODE; rpcMsg.msgType = TSDB_MSG_TYPE_CREATE_DNODE;
sendMsg(pClient, &rpcMsg); sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp; SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
}
taosMsleep(1300); taosMsleep(1300);
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7); SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
...@@ -255,21 +253,21 @@ TEST_F(DndTestDnode, 03_CreateDnode) { ...@@ -255,21 +253,21 @@ TEST_F(DndTestDnode, 03_CreateDnode) {
CheckTimestamp(); CheckTimestamp();
CheckBinary("", 24); CheckBinary("", 24);
CheckBinary("", 24); CheckBinary("", 24);
}
TEST_F(DndTestDnode, 04_DropDnode) { {
SDropDnodeMsg* pReq = (SDropDnodeMsg*)rpcMallocCont(sizeof(SDropDnodeMsg)); SDropDnodeMsg* pReq = (SDropDnodeMsg*)rpcMallocCont(sizeof(SDropDnodeMsg));
pReq->dnodeId = htonl(2); pReq->dnodeId = htonl(2);
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq; rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SDropDnodeMsg); rpcMsg.contLen = sizeof(SDropDnodeMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_DROP_DNODE; rpcMsg.msgType = TSDB_MSG_TYPE_DROP_DNODE;
sendMsg(pClient, &rpcMsg); sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp; SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7); SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
SendThenCheckShowRetrieveMsg(1); SendThenCheckShowRetrieveMsg(1);
...@@ -280,9 +278,7 @@ TEST_F(DndTestDnode, 04_DropDnode) { ...@@ -280,9 +278,7 @@ TEST_F(DndTestDnode, 04_DropDnode) {
CheckBinary("ready", 10); CheckBinary("ready", 10);
CheckTimestamp(); CheckTimestamp();
CheckBinary("", 24); CheckBinary("", 24);
}
TEST_F(DndTestDnode, 05_CreateDnode) {
{ {
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg)); SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(sizeof(SCreateDnodeMsg));
strcpy(pReq->ep, "localhost:9043"); strcpy(pReq->ep, "localhost:9043");
...@@ -359,9 +355,8 @@ TEST_F(DndTestDnode, 05_CreateDnode) { ...@@ -359,9 +355,8 @@ TEST_F(DndTestDnode, 05_CreateDnode) {
CheckBinary("", 24); CheckBinary("", 24);
CheckBinary("", 24); CheckBinary("", 24);
CheckBinary("", 24); CheckBinary("", 24);
}
TEST_F(DndTestDnode, 06_RestartDnode) { // restart
uInfo("stop all server"); uInfo("stop all server");
stopServer(pServer1); stopServer(pServer1);
stopServer(pServer2); stopServer(pServer2);
......
/* /**
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> * @file vnodeApiTests.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module profile-msg tests
* @version 0.1
* @date 2021-12-15
* *
* This program is free software: you can use, redistribute, and/or modify * @copyright Copyright (c) 2021
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* *
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "deploy.h" #include "deploy.h"
......
/* /**
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> * @file vnodeApiTests.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module show-msg tests
* @version 0.1
* @date 2021-12-15
* *
* This program is free software: you can use, redistribute, and/or modify * @copyright Copyright (c) 2021
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* *
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "deploy.h" #include "deploy.h"
......
/* /**
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> * @file vnodeApiTests.cpp
* @author slguan (slguan@taosdata.com)
* @brief DNODE module user-msg tests
* @version 0.1
* @date 2021-12-15
* *
* This program is free software: you can use, redistribute, and/or modify * @copyright Copyright (c) 2021
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
* *
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "deploy.h" #include "deploy.h"
...@@ -184,7 +180,7 @@ TEST_F(DndTestUser, 01_ShowUser) { ...@@ -184,7 +180,7 @@ TEST_F(DndTestUser, 01_ShowUser) {
CheckBinary("root", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN);
} }
TEST_F(DndTestUser, 02_CreateUser) { TEST_F(DndTestUser, 02_Create_Drop_Alter_User) {
{ {
SCreateUserMsg* pReq = (SCreateUserMsg*)rpcMallocCont(sizeof(SCreateUserMsg)); SCreateUserMsg* pReq = (SCreateUserMsg*)rpcMallocCont(sizeof(SCreateUserMsg));
strcpy(pReq->user, "u1"); strcpy(pReq->user, "u1");
...@@ -231,23 +227,22 @@ TEST_F(DndTestUser, 02_CreateUser) { ...@@ -231,23 +227,22 @@ TEST_F(DndTestUser, 02_CreateUser) {
CheckBinary("root", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN);
CheckBinary("root", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN);
CheckBinary("root", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN);
}
TEST_F(DndTestUser, 03_AlterUser) { {
SAlterUserMsg* pReq = (SAlterUserMsg*)rpcMallocCont(sizeof(SAlterUserMsg)); SAlterUserMsg* pReq = (SAlterUserMsg*)rpcMallocCont(sizeof(SAlterUserMsg));
strcpy(pReq->user, "u1"); strcpy(pReq->user, "u1");
strcpy(pReq->pass, "p2"); strcpy(pReq->pass, "p2");
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = sizeof(SAlterUserMsg);
rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_USER;
sendMsg(pClient, &rpcMsg); SRpcMsg rpcMsg = {0};
SRpcMsg* pMsg = pClient->pRsp; rpcMsg.pCont = pReq;
ASSERT_NE(pMsg, nullptr); rpcMsg.contLen = sizeof(SAlterUserMsg);
ASSERT_EQ(pMsg->code, 0); rpcMsg.msgType = TSDB_MSG_TYPE_ALTER_USER;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4); SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4);
SendThenCheckShowRetrieveMsg(3); SendThenCheckShowRetrieveMsg(3);
CheckBinary("u1", TSDB_USER_LEN); CheckBinary("u1", TSDB_USER_LEN);
...@@ -262,22 +257,21 @@ TEST_F(DndTestUser, 03_AlterUser) { ...@@ -262,22 +257,21 @@ TEST_F(DndTestUser, 03_AlterUser) {
CheckBinary("root", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN);
CheckBinary("root", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN);
CheckBinary("root", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN);
}
TEST_F(DndTestUser, 04_DropUser) {
SDropUserMsg* pReq = (SDropUserMsg*)rpcMallocCont(sizeof(SDropUserMsg));
strcpy(pReq->user, "u1");
SRpcMsg rpcMsg = {0}; {
rpcMsg.pCont = pReq; SDropUserMsg* pReq = (SDropUserMsg*)rpcMallocCont(sizeof(SDropUserMsg));
rpcMsg.contLen = sizeof(SDropUserMsg); strcpy(pReq->user, "u1");
rpcMsg.msgType = TSDB_MSG_TYPE_DROP_USER;
sendMsg(pClient, &rpcMsg); SRpcMsg rpcMsg = {0};
SRpcMsg* pMsg = pClient->pRsp; rpcMsg.pCont = pReq;
ASSERT_NE(pMsg, nullptr); rpcMsg.contLen = sizeof(SDropUserMsg);
ASSERT_EQ(pMsg->code, 0); rpcMsg.msgType = TSDB_MSG_TYPE_DROP_USER;
sendMsg(pClient, &rpcMsg);
SRpcMsg* pMsg = pClient->pRsp;
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0);
}
SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4); SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_USER, "show users", 4);
SendThenCheckShowRetrieveMsg(2); SendThenCheckShowRetrieveMsg(2);
CheckBinary("root", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN);
...@@ -288,9 +282,8 @@ TEST_F(DndTestUser, 04_DropUser) { ...@@ -288,9 +282,8 @@ TEST_F(DndTestUser, 04_DropUser) {
CheckTimestamp(); CheckTimestamp();
CheckBinary("root", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN);
CheckBinary("root", TSDB_USER_LEN); CheckBinary("root", TSDB_USER_LEN);
}
TEST_F(DndTestUser, 05_RestartDnode) { // restart
stopServer(pServer); stopServer(pServer);
pServer = NULL; pServer = NULL;
......
...@@ -42,13 +42,6 @@ extern int32_t mDebugFlag; ...@@ -42,13 +42,6 @@ extern int32_t mDebugFlag;
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }} #define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }} #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); }}
typedef struct SClusterObj SClusterObj;
typedef struct SMnodeObj SMnodeObj;
typedef struct SAcctObj SAcctObj;
typedef struct SVgObj SVgObj;
typedef struct SFuncObj SFuncObj;
typedef struct SOperObj SOperObj;
typedef enum { typedef enum {
MND_AUTH_ACCT_START = 0, MND_AUTH_ACCT_START = 0,
MND_AUTH_ACCT_USER, MND_AUTH_ACCT_USER,
...@@ -99,7 +92,7 @@ typedef enum { ...@@ -99,7 +92,7 @@ typedef enum {
DND_REASON_OTHERS DND_REASON_OTHERS
} EDndReason; } EDndReason;
typedef struct STrans { typedef struct {
int32_t id; int32_t id;
ETrnStage stage; ETrnStage stage;
ETrnPolicy policy; ETrnPolicy policy;
...@@ -111,7 +104,7 @@ typedef struct STrans { ...@@ -111,7 +104,7 @@ typedef struct STrans {
SArray *undoActions; SArray *undoActions;
} STrans; } STrans;
typedef struct SClusterObj { typedef struct {
int32_t id; int32_t id;
char name[TSDB_CLUSTER_ID_LEN]; char name[TSDB_CLUSTER_ID_LEN];
int64_t createdTime; int64_t createdTime;
...@@ -139,7 +132,7 @@ typedef struct { ...@@ -139,7 +132,7 @@ typedef struct {
char ep[TSDB_EP_LEN]; char ep[TSDB_EP_LEN];
} SDnodeObj; } SDnodeObj;
typedef struct SMnodeObj { typedef struct {
int32_t id; int32_t id;
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
...@@ -167,7 +160,7 @@ typedef struct { ...@@ -167,7 +160,7 @@ typedef struct {
int64_t compStorage; // Compressed storage on disk int64_t compStorage; // Compressed storage on disk
} SAcctInfo; } SAcctInfo;
typedef struct SAcctObj { typedef struct {
char acct[TSDB_USER_LEN]; char acct[TSDB_USER_LEN];
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
...@@ -195,8 +188,8 @@ typedef struct { ...@@ -195,8 +188,8 @@ typedef struct {
int32_t daysToKeep0; int32_t daysToKeep0;
int32_t daysToKeep1; int32_t daysToKeep1;
int32_t daysToKeep2; int32_t daysToKeep2;
int32_t minRowsPerFileBlock; int32_t minRows;
int32_t maxRowsPerFileBlock; int32_t maxRows;
int32_t commitTime; int32_t commitTime;
int32_t fsyncPeriod; int32_t fsyncPeriod;
int8_t walLevel; int8_t walLevel;
...@@ -214,7 +207,10 @@ typedef struct { ...@@ -214,7 +207,10 @@ typedef struct {
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
int64_t uid; int64_t uid;
int32_t version; int32_t cfgVersion;
int32_t vgVersion;
int32_t numOfVgroups;
int8_t hashMethod; // default is 1
SDbCfg cfg; SDbCfg cfg;
} SDbObj; } SDbObj;
...@@ -223,12 +219,15 @@ typedef struct { ...@@ -223,12 +219,15 @@ typedef struct {
ESyncState role; ESyncState role;
} SVnodeGid; } SVnodeGid;
typedef struct SVgObj { typedef struct {
int32_t vgId; int32_t vgId;
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
int32_t version; int32_t version;
int32_t hashBegin;
int32_t hashEnd;
char dbName[TSDB_FULL_DB_NAME_LEN]; char dbName[TSDB_FULL_DB_NAME_LEN];
int64_t dbUid;
int32_t numOfTables; int32_t numOfTables;
int32_t numOfTimeSeries; int32_t numOfTimeSeries;
int64_t totalStorage; int64_t totalStorage;
...@@ -252,7 +251,7 @@ typedef struct { ...@@ -252,7 +251,7 @@ typedef struct {
SSchema *pSchema; SSchema *pSchema;
} SStbObj; } SStbObj;
typedef struct SFuncObj { typedef struct {
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN];
int64_t createdTime; int64_t createdTime;
int8_t funcType; int8_t funcType;
......
...@@ -28,6 +28,7 @@ SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId); ...@@ -28,6 +28,7 @@ SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId);
void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode); void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode);
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode); SEpSet mndGetDnodeEpset(SDnodeObj *pDnode);
int32_t mndGetDnodeSize(SMnode *pMnode); int32_t mndGetDnodeSize(SMnode *pMnode);
bool mndIsDnodeInReadyStatus(SMnode *pMnode, SDnodeObj *pDnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -22,10 +22,13 @@ ...@@ -22,10 +22,13 @@
extern "C" { extern "C" {
#endif #endif
int32_t mndInitVgroup(SMnode *pMnode); int32_t mndInitVgroup(SMnode *pMnode);
void mndCleanupVgroup(SMnode *pMnode); void mndCleanupVgroup(SMnode *pMnode);
SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId); SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId);
void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup); void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup);
SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "mndShow.h" #include "mndShow.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h"
#define TSDB_DB_VER_NUM 1 #define TSDB_DB_VER_NUM 1
#define TSDB_DB_RESERVE_SIZE 64 #define TSDB_DB_RESERVE_SIZE 64
...@@ -74,15 +75,18 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) { ...@@ -74,15 +75,18 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT64(pRaw, dataPos, pDb->createdTime) SDB_SET_INT64(pRaw, dataPos, pDb->createdTime)
SDB_SET_INT64(pRaw, dataPos, pDb->updateTime) SDB_SET_INT64(pRaw, dataPos, pDb->updateTime)
SDB_SET_INT64(pRaw, dataPos, pDb->uid) SDB_SET_INT64(pRaw, dataPos, pDb->uid)
SDB_SET_INT32(pRaw, dataPos, pDb->version) SDB_SET_INT32(pRaw, dataPos, pDb->cfgVersion)
SDB_SET_INT32(pRaw, dataPos, pDb->vgVersion)
SDB_SET_INT32(pRaw, dataPos, pDb->numOfVgroups)
SDB_SET_INT8(pRaw, dataPos, pDb->hashMethod)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.cacheBlockSize) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.cacheBlockSize)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.totalBlocks) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.totalBlocks)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysPerFile) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysPerFile)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep0) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep0)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep1) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep1)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep2) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.daysToKeep2)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.minRowsPerFileBlock) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.minRows)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.maxRowsPerFileBlock) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.maxRows)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.commitTime) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.commitTime)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.fsyncPeriod) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.fsyncPeriod)
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.walLevel) SDB_SET_INT8(pRaw, dataPos, pDb->cfg.walLevel)
...@@ -118,15 +122,18 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) { ...@@ -118,15 +122,18 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->createdTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->updateTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->updateTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->uid) SDB_GET_INT64(pRaw, pRow, dataPos, &pDb->uid)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->version) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfgVersion)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->vgVersion)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->numOfVgroups)
SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->hashMethod)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.cacheBlockSize) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.cacheBlockSize)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.totalBlocks) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.totalBlocks)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysPerFile) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysPerFile)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep0) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep0)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep1) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep1)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep2) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.daysToKeep2)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.minRowsPerFileBlock) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.minRows)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.maxRowsPerFileBlock) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.maxRows)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.commitTime) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.commitTime)
SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.fsyncPeriod) SDB_GET_INT32(pRaw, pRow, dataPos, &pDb->cfg.fsyncPeriod)
SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.walLevel) SDB_GET_INT8(pRaw, pRow, dataPos, &pDb->cfg.walLevel)
...@@ -154,6 +161,9 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) { ...@@ -154,6 +161,9 @@ static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb) {
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb) { static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOldDb, SDbObj *pNewDb) {
mTrace("db:%s, perform update action", pOldDb->name); mTrace("db:%s, perform update action", pOldDb->name);
pOldDb->updateTime = pNewDb->createdTime; pOldDb->updateTime = pNewDb->createdTime;
pOldDb->cfgVersion = pNewDb->cfgVersion;
pOldDb->vgVersion = pNewDb->vgVersion;
pOldDb->numOfVgroups = pNewDb->numOfVgroups;
memcpy(&pOldDb->cfg, &pNewDb->cfg, sizeof(SDbCfg)); memcpy(&pOldDb->cfg, &pNewDb->cfg, sizeof(SDbCfg));
return 0; return 0;
} }
...@@ -184,133 +194,30 @@ static int32_t mndCheckDbName(char *dbName, SUserObj *pUser) { ...@@ -184,133 +194,30 @@ static int32_t mndCheckDbName(char *dbName, SUserObj *pUser) {
return 0; return 0;
} }
static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg, char *errMsg, int32_t len) { static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if (pCfg->cacheBlockSize < TSDB_MIN_CACHE_BLOCK_SIZE || pCfg->cacheBlockSize > TSDB_MAX_CACHE_BLOCK_SIZE) { if (pCfg->cacheBlockSize < TSDB_MIN_CACHE_BLOCK_SIZE || pCfg->cacheBlockSize > TSDB_MAX_CACHE_BLOCK_SIZE) return -1;
terrno = TSDB_CODE_MND_INVALID_DB_OPTION; if (pCfg->totalBlocks < TSDB_MIN_TOTAL_BLOCKS || pCfg->totalBlocks > TSDB_MAX_TOTAL_BLOCKS) return -1;
tstrncpy(errMsg, "Invalid database cache block size option", len); if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) return -1;
return -1; if (pCfg->daysToKeep0 < pCfg->daysPerFile) return -1;
} if (pCfg->daysToKeep0 < TSDB_MIN_KEEP || pCfg->daysToKeep0 > TSDB_MAX_KEEP) return -1;
if (pCfg->daysToKeep1 < TSDB_MIN_KEEP || pCfg->daysToKeep1 > TSDB_MAX_KEEP) return -1;
if (pCfg->totalBlocks < TSDB_MIN_TOTAL_BLOCKS || pCfg->totalBlocks > TSDB_MAX_TOTAL_BLOCKS) { if (pCfg->daysToKeep2 < TSDB_MIN_KEEP || pCfg->daysToKeep2 > TSDB_MAX_KEEP) return -1;
terrno = TSDB_CODE_MND_INVALID_DB_OPTION; if (pCfg->daysToKeep0 > pCfg->daysToKeep1) return -1;
tstrncpy(errMsg, "Invalid database total blocks option", len); if (pCfg->daysToKeep1 > pCfg->daysToKeep2) return -1;
return -1; if (pCfg->minRows < TSDB_MIN_MIN_ROW_FBLOCK || pCfg->minRows > TSDB_MAX_MIN_ROW_FBLOCK) return -1;
} if (pCfg->maxRows < TSDB_MIN_MAX_ROW_FBLOCK || pCfg->maxRows > TSDB_MAX_MAX_ROW_FBLOCK) return -1;
if (pCfg->minRows > pCfg->maxRows) return -1;
if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) { if (pCfg->commitTime < TSDB_MIN_COMMIT_TIME || pCfg->commitTime > TSDB_MAX_COMMIT_TIME) return -1;
terrno = TSDB_CODE_MND_INVALID_DB_OPTION; if (pCfg->fsyncPeriod < TSDB_MIN_FSYNC_PERIOD || pCfg->fsyncPeriod > TSDB_MAX_FSYNC_PERIOD) return -1;
tstrncpy(errMsg, "Invalid database days option", len); if (pCfg->walLevel < TSDB_MIN_WAL_LEVEL || pCfg->walLevel > TSDB_MAX_WAL_LEVEL) return -1;
return -1; if (pCfg->precision < TSDB_MIN_PRECISION && pCfg->precision > TSDB_MAX_PRECISION) return -1;
} if (pCfg->compression < TSDB_MIN_COMP_LEVEL || pCfg->compression > TSDB_MAX_COMP_LEVEL) return -1;
if (pCfg->replications < TSDB_MIN_DB_REPLICA_OPTION || pCfg->replications > TSDB_MAX_DB_REPLICA_OPTION) return -1;
if (pCfg->daysToKeep0 < pCfg->daysPerFile) { if (pCfg->replications > mndGetDnodeSize(pMnode)) return -1;
terrno = TSDB_CODE_MND_INVALID_DB_OPTION; if (pCfg->quorum < TSDB_MIN_DB_QUORUM_OPTION || pCfg->quorum > TSDB_MAX_DB_QUORUM_OPTION) return -1;
tstrncpy(errMsg, "Invalid database days option", len); if (pCfg->quorum > pCfg->replications) return -1;
return -1; if (pCfg->update < TSDB_MIN_DB_UPDATE || pCfg->update > TSDB_MAX_DB_UPDATE) return -1;
} if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1;
if (pCfg->daysToKeep0 < TSDB_MIN_KEEP || pCfg->daysToKeep0 > TSDB_MAX_KEEP || pCfg->daysToKeep0 > pCfg->daysToKeep1) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database keep0 option", len);
return -1;
}
if (pCfg->daysToKeep1 < TSDB_MIN_KEEP || pCfg->daysToKeep1 > TSDB_MAX_KEEP || pCfg->daysToKeep1 > pCfg->daysToKeep2) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database keep1 option", len);
return -1;
}
if (pCfg->daysToKeep2 < TSDB_MIN_KEEP || pCfg->daysToKeep2 > TSDB_MAX_KEEP) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database keep2 option", len);
return -1;
}
if (pCfg->minRowsPerFileBlock < TSDB_MIN_MIN_ROW_FBLOCK || pCfg->minRowsPerFileBlock > TSDB_MAX_MIN_ROW_FBLOCK) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database minrows option", len);
return -1;
}
if (pCfg->maxRowsPerFileBlock < TSDB_MIN_MAX_ROW_FBLOCK || pCfg->maxRowsPerFileBlock > TSDB_MAX_MAX_ROW_FBLOCK) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database maxrows option", len);
return -1;
}
if (pCfg->minRowsPerFileBlock > pCfg->maxRowsPerFileBlock) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database minrows option", len);
return -1;
}
if (pCfg->commitTime < TSDB_MIN_COMMIT_TIME || pCfg->commitTime > TSDB_MAX_COMMIT_TIME) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database commit option", len);
return -1;
}
if (pCfg->fsyncPeriod < TSDB_MIN_FSYNC_PERIOD || pCfg->fsyncPeriod > TSDB_MAX_FSYNC_PERIOD) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database fsync option", len);
return -1;
}
if (pCfg->walLevel < TSDB_MIN_WAL_LEVEL || pCfg->walLevel > TSDB_MAX_WAL_LEVEL) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database wal level option", len);
return -1;
}
if (pCfg->precision < TSDB_MIN_PRECISION && pCfg->precision > TSDB_MAX_PRECISION) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid precision option", len);
return -1;
}
if (pCfg->compression < TSDB_MIN_COMP_LEVEL || pCfg->compression > TSDB_MAX_COMP_LEVEL) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database compression option", len);
return -1;
}
if (pCfg->replications < TSDB_MIN_DB_REPLICA_OPTION || pCfg->replications > TSDB_MAX_DB_REPLICA_OPTION) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database replication option", len);
return -1;
}
if (pCfg->replications > mndGetDnodeSize(pMnode)) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database replication option", len);
return -1;
}
if (pCfg->quorum < TSDB_MIN_DB_QUORUM_OPTION || pCfg->quorum > TSDB_MAX_DB_QUORUM_OPTION) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database quorum option", len);
return -1;
}
if (pCfg->quorum > pCfg->replications) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database quorum option", len);
return -1;
}
if (pCfg->update < TSDB_MIN_DB_UPDATE || pCfg->update > TSDB_MAX_DB_UPDATE) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database update option", len);
return -1;
}
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
tstrncpy(errMsg, "Invalid database cachelast option", len);
return -1;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -321,8 +228,8 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { ...@@ -321,8 +228,8 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->daysToKeep0 < 0) pCfg->daysToKeep0 = TSDB_DEFAULT_KEEP; if (pCfg->daysToKeep0 < 0) pCfg->daysToKeep0 = TSDB_DEFAULT_KEEP;
if (pCfg->daysToKeep1 < 0) pCfg->daysToKeep1 = TSDB_DEFAULT_KEEP; if (pCfg->daysToKeep1 < 0) pCfg->daysToKeep1 = TSDB_DEFAULT_KEEP;
if (pCfg->daysToKeep2 < 0) pCfg->daysToKeep2 = TSDB_DEFAULT_KEEP; if (pCfg->daysToKeep2 < 0) pCfg->daysToKeep2 = TSDB_DEFAULT_KEEP;
if (pCfg->minRowsPerFileBlock < 0) pCfg->minRowsPerFileBlock = TSDB_DEFAULT_MIN_ROW_FBLOCK; if (pCfg->minRows < 0) pCfg->minRows = TSDB_DEFAULT_MIN_ROW_FBLOCK;
if (pCfg->maxRowsPerFileBlock < 0) pCfg->maxRowsPerFileBlock = TSDB_DEFAULT_MAX_ROW_FBLOCK; if (pCfg->maxRows < 0) pCfg->maxRows = TSDB_DEFAULT_MAX_ROW_FBLOCK;
if (pCfg->commitTime < 0) pCfg->commitTime = TSDB_DEFAULT_COMMIT_TIME; if (pCfg->commitTime < 0) pCfg->commitTime = TSDB_DEFAULT_COMMIT_TIME;
if (pCfg->fsyncPeriod < 0) pCfg->fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD; if (pCfg->fsyncPeriod < 0) pCfg->fsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
if (pCfg->walLevel < 0) pCfg->walLevel = TSDB_DEFAULT_WAL_LEVEL; if (pCfg->walLevel < 0) pCfg->walLevel = TSDB_DEFAULT_WAL_LEVEL;
...@@ -334,6 +241,48 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { ...@@ -334,6 +241,48 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW; if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW;
} }
static int32_t mndSetRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
if (pDbRaw == NULL || mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1;
sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING);
for (int v = 0; v < pDb->numOfVgroups; ++v) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
if (pVgRaw == NULL || mndTransAppendRedolog(pTrans, pVgRaw) != 0) return -1;
sdbSetRawStatus(pVgRaw, SDB_STATUS_CREATING);
}
return 0;
}
static int32_t mndSetUndoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
if (pDbRaw == NULL || mndTransAppendUndolog(pTrans, pDbRaw) != 0) return -1;
sdbSetRawStatus(pDbRaw, SDB_STATUS_DROPPED);
for (int v = 0; v < pDb->numOfVgroups; ++v) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
if (pVgRaw == NULL || mndTransAppendUndolog(pTrans, pVgRaw) != 0) return -1;
sdbSetRawStatus(pVgRaw, SDB_STATUS_DROPPED);
}
return 0;
}
static int32_t mndSetCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
if (pDbRaw == NULL || mndTransAppendCommitlog(pTrans, pDbRaw) != 0) return -1;
sdbSetRawStatus(pDbRaw, SDB_STATUS_READY);
for (int v = 0; v < pDb->numOfVgroups; ++v) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
if (pVgRaw == NULL || mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1;
sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
}
return 0;
}
static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreate, SUserObj *pUser) { static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreate, SUserObj *pUser) {
SDbObj dbObj = {0}; SDbObj dbObj = {0};
tstrncpy(dbObj.name, pCreate->db, TSDB_FULL_DB_NAME_LEN); tstrncpy(dbObj.name, pCreate->db, TSDB_FULL_DB_NAME_LEN);
...@@ -341,14 +290,18 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat ...@@ -341,14 +290,18 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
dbObj.createdTime = taosGetTimestampMs(); dbObj.createdTime = taosGetTimestampMs();
dbObj.updateTime = dbObj.createdTime; dbObj.updateTime = dbObj.createdTime;
dbObj.uid = mndGenerateUid(dbObj.name, TSDB_FULL_DB_NAME_LEN); dbObj.uid = mndGenerateUid(dbObj.name, TSDB_FULL_DB_NAME_LEN);
dbObj.numOfVgroups = pCreate->numOfVgroups;
dbObj.hashMethod = 1;
dbObj.cfgVersion = 1;
dbObj.vgVersion = 1;
dbObj.cfg = (SDbCfg){.cacheBlockSize = pCreate->cacheBlockSize, dbObj.cfg = (SDbCfg){.cacheBlockSize = pCreate->cacheBlockSize,
.totalBlocks = pCreate->totalBlocks, .totalBlocks = pCreate->totalBlocks,
.daysPerFile = pCreate->daysPerFile, .daysPerFile = pCreate->daysPerFile,
.daysToKeep0 = pCreate->daysToKeep0, .daysToKeep0 = pCreate->daysToKeep0,
.daysToKeep1 = pCreate->daysToKeep1, .daysToKeep1 = pCreate->daysToKeep1,
.daysToKeep2 = pCreate->daysToKeep2, .daysToKeep2 = pCreate->daysToKeep2,
.minRowsPerFileBlock = pCreate->minRowsPerFileBlock, .minRows = pCreate->minRowsPerFileBlock,
.maxRowsPerFileBlock = pCreate->maxRowsPerFileBlock, .maxRows = pCreate->maxRowsPerFileBlock,
.fsyncPeriod = pCreate->fsyncPeriod, .fsyncPeriod = pCreate->fsyncPeriod,
.commitTime = pCreate->commitTime, .commitTime = pCreate->commitTime,
.precision = pCreate->precision, .precision = pCreate->precision,
...@@ -366,57 +319,59 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat ...@@ -366,57 +319,59 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat
return -1; return -1;
} }
char errMsg[TSDB_ERROR_MSG_LEN] = {0}; if (mndCheckDbCfg(pMnode, &dbObj.cfg) != 0) {
if (mndCheckDbCfg(pMnode, &dbObj.cfg, errMsg, TSDB_ERROR_MSG_LEN) != 0) { terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
mError("db:%s, failed to create since %s", pCreate->db, terrstr()); mError("db:%s, failed to create since %s", pCreate->db, terrstr());
return -1; return -1;
} }
SVgObj *pVgroups = NULL;
if (mndAllocVgroup(pMnode, &dbObj, &pVgroups) != 0) {
mError("db:%s, failed to create since %s", pCreate->db, terrstr());
return -1;
}
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) { if (pTrans == NULL) {
mError("db:%s, failed to create since %s", pCreate->db, terrstr()); mError("db:%s, failed to create since %s", pCreate->db, terrstr());
return -1; goto CREATE_DB_OVER;
} }
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db); mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
SSdbRaw *pRedoRaw = mndDbActionEncode(&dbObj); if (mndSetRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); goto CREATE_DB_OVER;
mndTransDrop(pTrans);
return -1;
} }
sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING);
SSdbRaw *pUndoRaw = mndDbActionEncode(&dbObj); if (mndSetUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { mError("trans:%d, failed to set undo log since %s", pTrans->id, terrstr());
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); goto CREATE_DB_OVER;
mndTransDrop(pTrans);
return -1;
} }
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
SSdbRaw *pCommitRaw = mndDbActionEncode(&dbObj); if (mndSetCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) {
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); goto CREATE_DB_OVER;
mndTransDrop(pTrans);
return -1;
} }
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); goto CREATE_DB_OVER;
return -1;
} }
code = 0;
CREATE_DB_OVER:
free(pVgroups);
mndTransDrop(pTrans); mndTransDrop(pTrans);
return 0; return code;
} }
static int32_t mndProcessCreateDbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessCreateDbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SCreateDbMsg *pCreate = pMsg->rpcMsg.pCont; SCreateDbMsg *pCreate = pMsg->rpcMsg.pCont;
pCreate->numOfVgroups = htonl(pCreate->numOfVgroups);
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize); pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
pCreate->totalBlocks = htonl(pCreate->totalBlocks); pCreate->totalBlocks = htonl(pCreate->totalBlocks);
pCreate->daysPerFile = htonl(pCreate->daysPerFile); pCreate->daysPerFile = htonl(pCreate->daysPerFile);
...@@ -573,7 +528,8 @@ static int32_t mndProcessAlterDbMsg(SMnodeMsg *pMsg) { ...@@ -573,7 +528,8 @@ static int32_t mndProcessAlterDbMsg(SMnodeMsg *pMsg) {
return code; return code;
} }
dbObj.version++; dbObj.cfgVersion++;
dbObj.updateTime = taosGetTimestampMs();
code = mndUpdateDb(pMnode, pMsg, pDb, &dbObj); code = mndUpdateDb(pMnode, pMsg, pDb, &dbObj);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
...@@ -658,17 +614,69 @@ static int32_t mndProcessDropDbMsg(SMnodeMsg *pMsg) { ...@@ -658,17 +614,69 @@ static int32_t mndProcessDropDbMsg(SMnodeMsg *pMsg) {
static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
SUseDbMsg *pUse = pMsg->rpcMsg.pCont; SUseDbMsg *pUse = pMsg->rpcMsg.pCont;
pUse->vgVersion = htonl(pUse->vgVersion);
SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); SDbObj *pDb = mndAcquireDb(pMnode, pUse->db);
if (pDb != NULL) { if (pDb == NULL) {
strncpy(pMsg->db, pUse->db, TSDB_FULL_DB_NAME_LEN); terrno = TSDB_CODE_MND_DB_NOT_EXIST;
mndReleaseDb(pMnode, pDb); mError("db:%s, failed to process use db msg since %s", pUse->db, terrstr());
return 0; return -1;
} else { }
mError("db:%s, failed to process use db msg since %s", pMsg->db, terrstr());
return -1; int32_t contLen = sizeof(SUseDbRsp) + pDb->numOfVgroups * sizeof(SVgroupInfo);
SUseDbRsp *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int32_t vindex = 0;
if (pUse->vgVersion < pDb->vgVersion) {
void *pIter = NULL;
while (vindex < pDb->numOfVgroups) {
SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid == pDb->uid) {
SVgroupInfo *pInfo = &pRsp->vgroupInfo[vindex];
pInfo->vgId = htonl(pVgroup->vgId);
pInfo->hashBegin = htonl(pVgroup->hashBegin);
pInfo->hashEnd = htonl(pVgroup->hashEnd);
pInfo->numOfEps = pVgroup->replica;
for (int32_t gid = 0; gid < pVgroup->replica; ++gid) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[gid];
SEpAddrMsg *pEpArrr = &pInfo->epAddr[gid];
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
if (pDnode != NULL) {
memcpy(pEpArrr->fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
pEpArrr->port = htons(pDnode->port);
}
mndReleaseDnode(pMnode, pDnode);
if (pVgid->role == TAOS_SYNC_STATE_LEADER) {
pInfo->inUse = gid;
}
}
vindex++;
}
sdbRelease(pSdb, pVgroup);
}
} }
memcpy(pRsp->db, pDb->name, TSDB_FULL_DB_NAME_LEN);
pRsp->vgVersion = htonl(pDb->vgVersion);
pRsp->vgNum = htonl(vindex);
pRsp->hashMethod = pDb->hashMethod;
pMsg->pCont = pRsp;
pMsg->contLen = contLen;
mndReleaseDb(pMnode, pDb);
return 0;
} }
static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) { static int32_t mndProcessSyncDbMsg(SMnodeMsg *pMsg) {
...@@ -718,6 +726,12 @@ static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMe ...@@ -718,6 +726,12 @@ static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMe
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++; cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "vgroups");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2; pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "replica"); strcpy(pSchema[cols].name, "replica");
...@@ -855,6 +869,10 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3 ...@@ -855,6 +869,10 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3
*(int64_t *)pWrite = pDb->createdTime; *(int64_t *)pWrite = pDb->createdTime;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDb->numOfVgroups;
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDb->cfg.replications; *(int16_t *)pWrite = pDb->cfg.replications;
cols++; cols++;
...@@ -886,11 +904,11 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3 ...@@ -886,11 +904,11 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.minRowsPerFileBlock; *(int32_t *)pWrite = pDb->cfg.minRows;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int32_t *)pWrite = pDb->cfg.maxRowsPerFileBlock; *(int32_t *)pWrite = pDb->cfg.maxRows;
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......
...@@ -218,6 +218,15 @@ int32_t mndGetDnodeSize(SMnode *pMnode) { ...@@ -218,6 +218,15 @@ int32_t mndGetDnodeSize(SMnode *pMnode) {
return sdbGetSize(pSdb, SDB_DNODE); return sdbGetSize(pSdb, SDB_DNODE);
} }
bool mndIsDnodeInReadyStatus(SMnode *pMnode, SDnodeObj *pDnode) {
int64_t ms = taosGetTimestampMs();
int64_t interval = ABS(pDnode->lastAccessTime - ms);
if (interval > 3000 * pMnode->cfg.statusInterval) {
return false;
}
return true;
}
static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) { static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t numOfEps) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
......
...@@ -24,11 +24,9 @@ ...@@ -24,11 +24,9 @@
#define TSDB_VGROUP_VER_NUM 1 #define TSDB_VGROUP_VER_NUM 1
#define TSDB_VGROUP_RESERVE_SIZE 64 #define TSDB_VGROUP_RESERVE_SIZE 64
static SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup); static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw); static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup);
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup);
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg); static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg); static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg);
...@@ -70,8 +68,8 @@ int32_t mndInitVgroup(SMnode *pMnode) { ...@@ -70,8 +68,8 @@ int32_t mndInitVgroup(SMnode *pMnode) {
void mndCleanupVgroup(SMnode *pMnode) {} void mndCleanupVgroup(SMnode *pMnode) {}
static SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) { SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
SSdbRaw *pRaw = sdbAllocRaw(SDB_DB, TSDB_VGROUP_VER_NUM, sizeof(SVgObj) + TSDB_VGROUP_RESERVE_SIZE); SSdbRaw *pRaw = sdbAllocRaw(SDB_VGROUP, TSDB_VGROUP_VER_NUM, sizeof(SVgObj) + TSDB_VGROUP_RESERVE_SIZE);
if (pRaw == NULL) return NULL; if (pRaw == NULL) return NULL;
int32_t dataPos = 0; int32_t dataPos = 0;
...@@ -79,7 +77,10 @@ static SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) { ...@@ -79,7 +77,10 @@ static SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime) SDB_SET_INT64(pRaw, dataPos, pVgroup->createdTime)
SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime) SDB_SET_INT64(pRaw, dataPos, pVgroup->updateTime)
SDB_SET_INT32(pRaw, dataPos, pVgroup->version) SDB_SET_INT32(pRaw, dataPos, pVgroup->version)
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashBegin)
SDB_SET_INT32(pRaw, dataPos, pVgroup->hashEnd)
SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_FULL_DB_NAME_LEN) SDB_SET_BINARY(pRaw, dataPos, pVgroup->dbName, TSDB_FULL_DB_NAME_LEN)
SDB_SET_INT64(pRaw, dataPos, pVgroup->dbUid)
SDB_SET_INT8(pRaw, dataPos, pVgroup->replica) SDB_SET_INT8(pRaw, dataPos, pVgroup->replica)
for (int8_t i = 0; i < pVgroup->replica; ++i) { for (int8_t i = 0; i < pVgroup->replica; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
...@@ -92,7 +93,7 @@ static SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) { ...@@ -92,7 +93,7 @@ static SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup) {
return pRaw; return pRaw;
} }
static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
...@@ -102,7 +103,7 @@ static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { ...@@ -102,7 +103,7 @@ static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
return NULL; return NULL;
} }
SSdbRow *pRow = sdbAllocRow(sizeof(SDbObj)); SSdbRow *pRow = sdbAllocRow(sizeof(SVgObj));
SVgObj *pVgroup = sdbGetRowObj(pRow); SVgObj *pVgroup = sdbGetRowObj(pRow);
if (pVgroup == NULL) return NULL; if (pVgroup == NULL) return NULL;
...@@ -111,7 +112,10 @@ static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) { ...@@ -111,7 +112,10 @@ static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->createdTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->updateTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->updateTime)
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->version) SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->version)
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->hashBegin)
SDB_GET_INT32(pRaw, pRow, dataPos, &pVgroup->hashEnd)
SDB_GET_BINARY(pRaw, pRow, dataPos, pVgroup->dbName, TSDB_FULL_DB_NAME_LEN) SDB_GET_BINARY(pRaw, pRow, dataPos, pVgroup->dbName, TSDB_FULL_DB_NAME_LEN)
SDB_GET_INT64(pRaw, pRow, dataPos, &pVgroup->dbUid)
SDB_GET_INT8(pRaw, pRow, dataPos, &pVgroup->replica) SDB_GET_INT8(pRaw, pRow, dataPos, &pVgroup->replica)
for (int8_t i = 0; i < pVgroup->replica; ++i) { for (int8_t i = 0; i < pVgroup->replica; ++i) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
...@@ -133,12 +137,6 @@ static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) { ...@@ -133,12 +137,6 @@ static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup) {
return 0; return 0;
} }
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup) { static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup) {
mTrace("vgId:%d, perform update action", pOldVgroup->vgId); mTrace("vgId:%d, perform update action", pOldVgroup->vgId);
pOldVgroup->updateTime = pNewVgroup->updateTime; pOldVgroup->updateTime = pNewVgroup->updateTime;
...@@ -158,6 +156,105 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) { ...@@ -158,6 +156,105 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
} }
static int32_t mndGetDefaultVgroupSize(SMnode *pMnode) {
// todo
return 2;
}
static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup) {
SSdb *pSdb = pMnode->pSdb;
int32_t allocedVnodes = 0;
void *pIter = NULL;
while (allocedVnodes < pVgroup->replica) {
SDnodeObj *pDnode = NULL;
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
if (pIter == NULL) break;
// todo
if (mndIsDnodeInReadyStatus(pMnode, pDnode)) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[allocedVnodes];
pVgid->dnodeId = pDnode->id;
if (pVgroup->replica == 1) {
pVgid->role = TAOS_SYNC_STATE_LEADER;
} else {
pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
}
allocedVnodes++;
}
sdbRelease(pSdb, pDnode);
}
if (allocedVnodes != pVgroup->replica) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
return -1;
}
return 0;
}
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
if (pDb->numOfVgroups != -1 &&
(pDb->numOfVgroups < TSDB_MIN_VNODES_PER_DB || pDb->numOfVgroups > TSDB_MAX_VNODES_PER_DB)) {
terrno = TSDB_CODE_MND_INVALID_DB_OPTION;
return -1;
}
if (pDb->numOfVgroups == -1) {
pDb->numOfVgroups = mndGetDefaultVgroupSize(pMnode);
if (pDb->numOfVgroups < 0) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
return -1;
}
}
SVgObj *pVgroups = calloc(pDb->numOfVgroups, sizeof(SVgObj));
if (pVgroups == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int32_t alloceVgroups = 0;
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
int32_t hashMin = 0;
int32_t hashMax = INT32_MAX;
int32_t hashInterval = (hashMax - hashMin) / pDb->numOfVgroups;
for (int32_t v = 0; v < pDb->numOfVgroups; v++) {
SVgObj *pVgroup = &pVgroups[v];
pVgroup->vgId = maxVgId++;
pVgroup->createdTime = taosGetTimestampMs();
pVgroup->updateTime = pVgroups->createdTime;
pVgroup->version = 1;
pVgroup->dbUid = pDb->uid;
pVgroup->hashBegin = hashMin + hashInterval * v;
if (v == pDb->numOfVgroups - 1) {
pVgroup->hashEnd = hashMax;
} else {
pVgroup->hashEnd = hashMin + hashInterval * (v + 1) - 1;
}
memcpy(pVgroup->dbName, pDb->name, TSDB_FULL_DB_NAME_LEN);
pVgroup->replica = pDb->cfg.replications;
if (mndGetAvailableDnode(pMnode, pVgroup) != 0) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
free(pVgroups);
return -1;
}
alloceVgroups++;
}
*ppVgroups = pVgroups;
return 0;
}
static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessSyncVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessCompactVnodeRsp(SMnodeMsg *pMsg) { return 0; }
static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pReplica, int32_t *pNumOfVgroups) { static int32_t mndGetVgroupMaxReplica(SMnode *pMnode, char *dbName, int8_t *pReplica, int32_t *pNumOfVgroups) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
...@@ -213,7 +310,7 @@ static int32_t mndGetVgroupMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg ...@@ -213,7 +310,7 @@ static int32_t mndGetVgroupMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg
cols++; cols++;
for (int32_t i = 0; i < pShow->replica; ++i) { for (int32_t i = 0; i < pShow->replica; ++i) {
pShow->bytes[cols] = 4; pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_dnode", i + 1); snprintf(pSchema[cols].name, TSDB_COL_NAME_LEN, "v%d_dnode", i + 1);
pSchema[cols].bytes = htons(pShow->bytes[cols]); pSchema[cols].bytes = htons(pShow->bytes[cols]);
...@@ -273,10 +370,6 @@ static int32_t mndRetrieveVgroups(SMnodeMsg *pMsg, SShowObj *pShow, char *data, ...@@ -273,10 +370,6 @@ static int32_t mndRetrieveVgroups(SMnodeMsg *pMsg, SShowObj *pShow, char *data,
cols++; cols++;
} }
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pVgroup->compact;
cols++;
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
numOfRows++; numOfRows++;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册